mirror of
https://github.com/numtide/treefmt.git
synced 2024-08-16 12:20:25 +03:00
commit
23e563b239
15
cache/cache.go
vendored
15
cache/cache.go
vendored
@ -283,21 +283,12 @@ func Update(files []*walk.File) error {
|
|||||||
bucket := tx.Bucket([]byte(pathsBucket))
|
bucket := tx.Bucket([]byte(pathsBucket))
|
||||||
|
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
currentInfo, err := os.Stat(f.Path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !(f.Info.ModTime() == currentInfo.ModTime() && f.Info.Size() == currentInfo.Size()) {
|
|
||||||
stats.Add(stats.Formatted, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := Entry{
|
entry := Entry{
|
||||||
Size: currentInfo.Size(),
|
Size: f.Info.Size(),
|
||||||
Modified: currentInfo.ModTime(),
|
Modified: f.Info.ModTime(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = putEntry(bucket, f.RelPath, &entry); err != nil {
|
if err := putEntry(bucket, f.RelPath, &entry); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,7 @@ type Format struct {
|
|||||||
globalExcludes []glob.Glob
|
globalExcludes []glob.Glob
|
||||||
|
|
||||||
filesCh chan *walk.File
|
filesCh chan *walk.File
|
||||||
|
formattedCh chan *walk.File
|
||||||
processedCh chan *walk.File
|
processedCh chan *walk.File
|
||||||
}
|
}
|
||||||
|
|
||||||
|
218
cli/format.go
218
cli/format.go
@ -148,13 +148,15 @@ func (f *Format) Run() (err error) {
|
|||||||
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
|
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
|
||||||
f.filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())
|
f.filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())
|
||||||
|
|
||||||
|
// create a channel for files that have been formatted
|
||||||
|
f.formattedCh = make(chan *walk.File, cap(f.filesCh))
|
||||||
|
|
||||||
// create a channel for files that have been processed
|
// create a channel for files that have been processed
|
||||||
f.processedCh = make(chan *walk.File, cap(f.filesCh))
|
f.processedCh = make(chan *walk.File, cap(f.filesCh))
|
||||||
|
|
||||||
// start concurrent processing tasks in reverse order
|
// start concurrent processing tasks in reverse order
|
||||||
if !f.NoCache {
|
eg.Go(f.updateCache(ctx))
|
||||||
eg.Go(f.updateCache(ctx))
|
eg.Go(f.detectFormatted(ctx))
|
||||||
}
|
|
||||||
eg.Go(f.applyFormatters(ctx))
|
eg.Go(f.applyFormatters(ctx))
|
||||||
eg.Go(f.walkFilesystem(ctx))
|
eg.Go(f.walkFilesystem(ctx))
|
||||||
|
|
||||||
@ -162,83 +164,6 @@ func (f *Format) Run() (err error) {
|
|||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Format) updateCache(ctx context.Context) func() error {
|
|
||||||
return func() error {
|
|
||||||
// used to batch updates for more efficient txs
|
|
||||||
batch := make([]*walk.File, 0, BatchSize)
|
|
||||||
|
|
||||||
// apply a batch
|
|
||||||
processBatch := func() error {
|
|
||||||
if f.Stdin {
|
|
||||||
// do nothing
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := cache.Update(batch); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
batch = batch[:0]
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
LOOP:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
// detect ctx cancellation
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
// respond to processed files
|
|
||||||
case file, ok := <-f.processedCh:
|
|
||||||
if !ok {
|
|
||||||
// channel has been closed, no further files to process
|
|
||||||
break LOOP
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.Stdin {
|
|
||||||
// dump file into stdout
|
|
||||||
f, err := os.Open(file.Path)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to open %s: %w", file.Path, err)
|
|
||||||
}
|
|
||||||
if _, err = io.Copy(os.Stdout, f); err != nil {
|
|
||||||
return fmt.Errorf("failed to copy %s to stdout: %w", file.Path, err)
|
|
||||||
}
|
|
||||||
if err = os.Remove(f.Name()); err != nil {
|
|
||||||
return fmt.Errorf("failed to remove temp file %s: %w", file.Path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.Add(stats.Formatted, 1)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// append to batch and process if we have enough
|
|
||||||
batch = append(batch, file)
|
|
||||||
if len(batch) == BatchSize {
|
|
||||||
if err := processBatch(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// final flush
|
|
||||||
if err := processBatch(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
|
|
||||||
if f.FailOnChange && stats.Value(stats.Formatted) != 0 {
|
|
||||||
return ErrFailOnChange
|
|
||||||
}
|
|
||||||
|
|
||||||
// print stats to stdout unless we are processing stdin and printing the results to stdout
|
|
||||||
if !f.Stdin {
|
|
||||||
stats.Print()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *Format) walkFilesystem(ctx context.Context) func() error {
|
func (f *Format) walkFilesystem(ctx context.Context) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
@ -368,9 +293,9 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pass each file to the processed channel
|
// pass each file to the formatted channel
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
f.processedCh <- task.File
|
f.formattedCh <- task.File
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -392,7 +317,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
|
|||||||
return func() error {
|
return func() error {
|
||||||
defer func() {
|
defer func() {
|
||||||
// close processed channel
|
// close processed channel
|
||||||
close(f.processedCh)
|
close(f.formattedCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// iterate the files channel
|
// iterate the files channel
|
||||||
@ -402,7 +327,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
|
|||||||
if format.PathMatches(file.RelPath, f.globalExcludes) {
|
if format.PathMatches(file.RelPath, f.globalExcludes) {
|
||||||
log.Debugf("path matched global excludes: %s", file.RelPath)
|
log.Debugf("path matched global excludes: %s", file.RelPath)
|
||||||
// mark it as processed and continue to the next
|
// mark it as processed and continue to the next
|
||||||
f.processedCh <- file
|
f.formattedCh <- file
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -421,7 +346,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
|
|||||||
}
|
}
|
||||||
log.Logf(f.OnUnmatched, "no formatter for path: %s", file.RelPath)
|
log.Logf(f.OnUnmatched, "no formatter for path: %s", file.RelPath)
|
||||||
// mark it as processed and continue to the next
|
// mark it as processed and continue to the next
|
||||||
f.processedCh <- file
|
f.formattedCh <- file
|
||||||
} else {
|
} else {
|
||||||
// record the match
|
// record the match
|
||||||
stats.Add(stats.Matched, 1)
|
stats.Add(stats.Matched, 1)
|
||||||
@ -444,6 +369,129 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Format) detectFormatted(ctx context.Context) func() error {
|
||||||
|
return func() error {
|
||||||
|
defer func() {
|
||||||
|
// close formatted channel
|
||||||
|
close(f.processedCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
|
||||||
|
// detect ctx cancellation
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
// take the next file that has been processed
|
||||||
|
case file, ok := <-f.formattedCh:
|
||||||
|
if !ok {
|
||||||
|
// channel has been closed, no further files to process
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// look up current file info
|
||||||
|
currentInfo, err := os.Stat(file.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stat processed file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if the file has changed
|
||||||
|
if !(file.Info.ModTime() == currentInfo.ModTime() && file.Info.Size() == currentInfo.Size()) {
|
||||||
|
// record the change
|
||||||
|
stats.Add(stats.Formatted, 1)
|
||||||
|
// log the change for diagnostics
|
||||||
|
log.Debugf("file has been changed: %s", file.Path)
|
||||||
|
// update the file info
|
||||||
|
file.Info = currentInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark as processed
|
||||||
|
f.processedCh <- file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Format) updateCache(ctx context.Context) func() error {
|
||||||
|
return func() error {
|
||||||
|
// used to batch updates for more efficient txs
|
||||||
|
batch := make([]*walk.File, 0, BatchSize)
|
||||||
|
|
||||||
|
// apply a batch
|
||||||
|
processBatch := func() error {
|
||||||
|
// pass the batch to the cache for updating
|
||||||
|
if err := cache.Update(batch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
batch = batch[:0]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we are processing from stdin that means we are outputting to stdout, no caching involved
|
||||||
|
// if f.NoCache is set that means either the user explicitly disabled the cache or we failed to open on
|
||||||
|
if f.Stdin || f.NoCache {
|
||||||
|
// do nothing
|
||||||
|
processBatch = func() error { return nil }
|
||||||
|
}
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// detect ctx cancellation
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
// respond to formatted files
|
||||||
|
case file, ok := <-f.processedCh:
|
||||||
|
if !ok {
|
||||||
|
// channel has been closed, no further files to process
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Stdin {
|
||||||
|
// dump file into stdout
|
||||||
|
f, err := os.Open(file.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open %s: %w", file.Path, err)
|
||||||
|
}
|
||||||
|
if _, err = io.Copy(os.Stdout, f); err != nil {
|
||||||
|
return fmt.Errorf("failed to copy %s to stdout: %w", file.Path, err)
|
||||||
|
}
|
||||||
|
if err = os.Remove(f.Name()); err != nil {
|
||||||
|
return fmt.Errorf("failed to remove temp file %s: %w", file.Path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// append to batch and process if we have enough
|
||||||
|
batch = append(batch, file)
|
||||||
|
if len(batch) == BatchSize {
|
||||||
|
if err := processBatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// final flush
|
||||||
|
if err := processBatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
|
||||||
|
if f.FailOnChange && stats.Value(stats.Formatted) != 0 {
|
||||||
|
return ErrFailOnChange
|
||||||
|
}
|
||||||
|
|
||||||
|
// print stats to stdout unless we are processing stdin and printing the results to stdout
|
||||||
|
if !f.Stdin {
|
||||||
|
stats.Print()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func findUp(searchDir string, fileName string) (path string, dir string, err error) {
|
func findUp(searchDir string, fileName string) (path string, dir string, err error) {
|
||||||
for _, dir := range eachDir(searchDir) {
|
for _, dir := range eachDir(searchDir) {
|
||||||
path := filepath.Join(dir, fileName)
|
path := filepath.Join(dir, fileName)
|
||||||
|
@ -364,6 +364,11 @@ func TestFailOnChange(t *testing.T) {
|
|||||||
test.WriteConfig(t, configPath, cfg)
|
test.WriteConfig(t, configPath, cfg)
|
||||||
_, err := cmd(t, "--fail-on-change", "--config-file", configPath, "--tree-root", tempDir)
|
_, err := cmd(t, "--fail-on-change", "--config-file", configPath, "--tree-root", tempDir)
|
||||||
as.ErrorIs(err, ErrFailOnChange)
|
as.ErrorIs(err, ErrFailOnChange)
|
||||||
|
|
||||||
|
// test with no cache
|
||||||
|
test.WriteConfig(t, configPath, cfg)
|
||||||
|
_, err = cmd(t, "--fail-on-change", "--config-file", configPath, "--tree-root", tempDir, "--no-cache")
|
||||||
|
as.ErrorIs(err, ErrFailOnChange)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBustCacheOnFormatterChange(t *testing.T) {
|
func TestBustCacheOnFormatterChange(t *testing.T) {
|
||||||
|
@ -82,7 +82,7 @@ func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {
|
|||||||
|
|
||||||
//
|
//
|
||||||
|
|
||||||
f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))
|
f.log.Infof("%v file(s) processed in %v", len(tasks), time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user