Merge pull request #11315 from atom/serialize-async-git

Serialize async git
This commit is contained in:
Josh Abernathy 2016-03-30 20:42:23 -04:00
commit a15da5fa01
3 changed files with 320 additions and 143 deletions

View File

@ -0,0 +1,66 @@
/** @babel */
import ResourcePool from '../src/resource-pool'
import {it} from './async-spec-helpers'
describe('ResourcePool', () => {
let queue
beforeEach(() => {
queue = new ResourcePool([{}])
})
describe('.enqueue', () => {
it('calls the enqueued function', async () => {
let called = false
await queue.enqueue(() => {
called = true
return Promise.resolve()
})
expect(called).toBe(true)
})
it('forwards values from the inner promise', async () => {
const result = await queue.enqueue(() => Promise.resolve(42))
expect(result).toBe(42)
})
it('forwards errors from the inner promise', async () => {
let threw = false
try {
await queue.enqueue(() => Promise.reject(new Error('down with the sickness')))
} catch (e) {
threw = true
}
expect(threw).toBe(true)
})
it('continues to dequeue work after a promise has been rejected', async () => {
try {
await queue.enqueue(() => Promise.reject(new Error('down with the sickness')))
} catch (e) {}
const result = await queue.enqueue(() => Promise.resolve(42))
expect(result).toBe(42)
})
it('queues up work', async () => {
let resolve = null
queue.enqueue(() => {
return new Promise((resolve_, reject) => {
resolve = resolve_
})
})
expect(queue.getQueueDepth()).toBe(0)
queue.enqueue(() => new Promise((resolve, reject) => {}))
expect(queue.getQueueDepth()).toBe(1)
resolve()
waitsFor(() => queue.getQueueDepth() === 0)
})
})
})

View File

@ -3,6 +3,7 @@
import fs from 'fs-plus'
import path from 'path'
import Git from 'nodegit'
import ResourcePool from './resource-pool'
import {Emitter, CompositeDisposable, Disposable} from 'event-kit'
const modifiedStatusFlags = Git.Status.STATUS.WT_MODIFIED | Git.Status.STATUS.INDEX_MODIFIED | Git.Status.STATUS.WT_DELETED | Git.Status.STATUS.INDEX_DELETED | Git.Status.STATUS.WT_TYPECHANGE | Git.Status.STATUS.INDEX_TYPECHANGE
@ -38,7 +39,8 @@ export default class GitRepositoryAsync {
}
constructor (_path, options = {}) {
Git.enableThreadSafety()
// We'll serialize our access manually.
Git.disableThreadSafety()
this.emitter = new Emitter()
this.subscriptions = new CompositeDisposable()
@ -50,6 +52,11 @@ export default class GitRepositoryAsync {
this._openExactPath = options.openExactPath || false
this.repoPromise = this.openRepository()
// NB: We don't currently _use_ the pooled object. But by giving it one
// thing, we're really just serializing all the work. Down the road, we
// could open multiple connections to the repository.
this.repoPool = new ResourcePool([this.repoPromise])
this.isCaseInsensitive = fs.isCaseInsensitive()
this.upstream = {}
this.submodules = {}
@ -81,6 +88,7 @@ export default class GitRepositoryAsync {
this.emitter.dispose()
this.emitter = null
}
if (this.subscriptions) {
this.subscriptions.dispose()
this.subscriptions = null
@ -260,10 +268,12 @@ export default class GitRepositoryAsync {
// Public: Returns a {Promise} which resolves to whether the given branch
// exists.
hasBranch (branch) {
return this.getRepo()
.then(repo => repo.getBranch(branch))
.then(branch => branch != null)
.catch(_ => false)
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => repo.getBranch(branch))
.then(branch => branch != null)
.catch(_ => false)
})
}
// Public: Retrieves a shortened version of the HEAD reference value.
@ -277,9 +287,11 @@ export default class GitRepositoryAsync {
//
// Returns a {Promise} which resolves to a {String}.
getShortHead (_path) {
return this.getRepo(_path)
.then(repo => repo.getCurrentBranch())
.then(branch => branch.shorthand())
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => repo.getCurrentBranch())
.then(branch => branch.shorthand())
})
}
// Public: Is the given path a submodule in the repository?
@ -289,14 +301,18 @@ export default class GitRepositoryAsync {
// Returns a {Promise} that resolves true if the given path is a submodule in
// the repository.
isSubmodule (_path) {
return this.getRepo()
.then(repo => repo.openIndex())
.then(index => Promise.all([index, this.relativizeToWorkingDirectory(_path)]))
.then(([index, relativePath]) => {
const entry = index.getByPath(relativePath)
if (!entry) return false
return this.relativizeToWorkingDirectory(_path)
.then(relativePath => {
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => repo.openIndex())
.then(index => {
const entry = index.getByPath(relativePath)
if (!entry) return false
return entry.mode === submoduleMode
return entry.mode === submoduleMode
})
})
})
}
@ -311,16 +327,18 @@ export default class GitRepositoryAsync {
// * `ahead` The {Number} of commits ahead.
// * `behind` The {Number} of commits behind.
getAheadBehindCount (reference, _path) {
return this.getRepo(_path)
.then(repo => Promise.all([repo, repo.getBranch(reference)]))
.then(([repo, local]) => {
const upstream = Git.Branch.upstream(local)
return Promise.all([repo, local, upstream])
})
.then(([repo, local, upstream]) => {
return Git.Graph.aheadBehind(repo, local.target(), upstream.target())
})
.catch(_ => ({ahead: 0, behind: 0}))
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => Promise.all([repo, repo.getBranch(reference)]))
.then(([repo, local]) => {
const upstream = Git.Branch.upstream(local)
return Promise.all([repo, local, upstream])
})
.then(([repo, local, upstream]) => {
return Git.Graph.aheadBehind(repo, local.target(), upstream.target())
})
.catch(_ => ({ahead: 0, behind: 0}))
})
}
// Public: Get the cached ahead/behind commit counts for the current branch's
@ -352,10 +370,12 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to the {String} git configuration value
// specified by the key.
getConfigValue (key, _path) {
return this.getRepo(_path)
.then(repo => repo.configSnapshot())
.then(config => config.getStringBuf(key))
.catch(_ => null)
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => repo.configSnapshot())
.then(config => config.getStringBuf(key))
.catch(_ => null)
})
}
// Public: Get the URL for the 'origin' remote.
@ -378,9 +398,11 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to a {String} branch name such as
// `refs/remotes/origin/master`.
getUpstreamBranch (_path) {
return this.getRepo(_path)
.then(repo => repo.getCurrentBranch())
.then(branch => Git.Branch.upstream(branch))
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => repo.getCurrentBranch())
.then(branch => Git.Branch.upstream(branch))
})
}
// Public: Gets all the local and remote references.
@ -393,23 +415,25 @@ export default class GitRepositoryAsync {
// * `remotes` An {Array} of remote reference names.
// * `tags` An {Array} of tag reference names.
getReferences (_path) {
return this.getRepo(_path)
.then(repo => repo.getReferences(Git.Reference.TYPE.LISTALL))
.then(refs => {
const heads = []
const remotes = []
const tags = []
for (const ref of refs) {
if (ref.isTag()) {
tags.push(ref.name())
} else if (ref.isRemote()) {
remotes.push(ref.name())
} else if (ref.isBranch()) {
heads.push(ref.name())
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => repo.getReferences(Git.Reference.TYPE.LISTALL))
.then(refs => {
const heads = []
const remotes = []
const tags = []
for (const ref of refs) {
if (ref.isTag()) {
tags.push(ref.name())
} else if (ref.isRemote()) {
remotes.push(ref.name())
} else if (ref.isBranch()) {
heads.push(ref.name())
}
}
}
return {heads, remotes, tags}
})
return {heads, remotes, tags}
})
})
}
// Public: Get the SHA for the given reference.
@ -421,9 +445,11 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to the current {String} SHA for the
// given reference.
getReferenceTarget (reference, _path) {
return this.getRepo(_path)
.then(repo => Git.Reference.nameToId(repo, reference))
.then(oid => oid.tostrS())
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => Git.Reference.nameToId(repo, reference))
.then(oid => oid.tostrS())
})
}
// Reading Status
@ -460,12 +486,17 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to a {Boolean} that's true if the `path`
// is ignored.
isPathIgnored (_path) {
return Promise.all([this.getRepo(), this.getWorkingDirectory()])
.then(([repo, wd]) => {
const relativePath = this.relativize(_path, wd)
return Git.Ignore.pathIsIgnored(repo, relativePath)
return this.getWorkingDirectory()
.then(wd => {
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => {
const relativePath = this.relativize(_path, wd)
return Git.Ignore.pathIsIgnored(repo, relativePath)
})
.then(ignored => Boolean(ignored))
})
})
.then(ignored => Boolean(ignored))
}
// Get the status of a directory in the repository's working directory.
@ -501,8 +532,8 @@ export default class GitRepositoryAsync {
// status bit for the path.
refreshStatusForPath (_path) {
let relativePath
return Promise.all([this.getRepo(), this.getWorkingDirectory()])
.then(([repo, wd]) => {
return this.getWorkingDirectory()
.then(wd => {
relativePath = this.relativize(_path, wd)
return this._getStatus([relativePath])
})
@ -609,34 +640,39 @@ export default class GitRepositoryAsync {
// * `added` The {Number} of added lines.
// * `deleted` The {Number} of deleted lines.
getDiffStats (_path) {
return this.getRepo(_path)
.then(repo => Promise.all([repo, repo.getHeadCommit()]))
.then(([repo, headCommit]) => Promise.all([repo, headCommit.getTree(), this.getWorkingDirectory(_path)]))
.then(([repo, tree, wd]) => {
const options = new Git.DiffOptions()
options.contextLines = 0
options.flags = Git.Diff.OPTION.DISABLE_PATHSPEC_MATCH
options.pathspec = this.relativize(_path, wd)
if (process.platform === 'win32') {
// Ignore eol of line differences on windows so that files checked in
// as LF don't report every line modified when the text contains CRLF
// endings.
options.flags |= Git.Diff.OPTION.IGNORE_WHITESPACE_EOL
}
return Git.Diff.treeToWorkdir(repo, tree, options)
})
.then(diff => this._getDiffLines(diff))
.then(lines => {
const stats = {added: 0, deleted: 0}
for (const line of lines) {
const origin = line.origin()
if (origin === Git.Diff.LINE.ADDITION) {
stats.added++
} else if (origin === Git.Diff.LINE.DELETION) {
stats.deleted++
}
}
return stats
return this.getWorkingDirectory(_path)
.then(wd => {
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => Promise.all([repo, repo.getHeadCommit()]))
.then(([repo, headCommit]) => Promise.all([repo, headCommit.getTree()]))
.then(([repo, tree]) => {
const options = new Git.DiffOptions()
options.contextLines = 0
options.flags = Git.Diff.OPTION.DISABLE_PATHSPEC_MATCH
options.pathspec = this.relativize(_path, wd)
if (process.platform === 'win32') {
// Ignore eol of line differences on windows so that files checked in
// as LF don't report every line modified when the text contains CRLF
// endings.
options.flags |= Git.Diff.OPTION.IGNORE_WHITESPACE_EOL
}
return Git.Diff.treeToWorkdir(repo, tree, options)
})
.then(diff => this._getDiffLines(diff))
.then(lines => {
const stats = {added: 0, deleted: 0}
for (const line of lines) {
const origin = line.origin()
if (origin === Git.Diff.LINE.ADDITION) {
stats.added++
} else if (origin === Git.Diff.LINE.DELETION) {
stats.deleted++
}
}
return stats
})
})
})
}
@ -652,24 +688,29 @@ export default class GitRepositoryAsync {
// * `oldLines` The {Number} of lines in the old hunk.
// * `newLines` The {Number} of lines in the new hunk
getLineDiffs (_path, text) {
let relativePath = null
return Promise.all([this.getRepo(_path), this.getWorkingDirectory(_path)])
.then(([repo, wd]) => {
relativePath = this.relativize(_path, wd)
return repo.getHeadCommit()
})
.then(commit => commit.getEntry(relativePath))
.then(entry => entry.getBlob())
.then(blob => {
const options = new Git.DiffOptions()
options.contextLines = 0
if (process.platform === 'win32') {
// Ignore eol of line differences on windows so that files checked in
// as LF don't report every line modified when the text contains CRLF
// endings.
options.flags = Git.Diff.OPTION.IGNORE_WHITESPACE_EOL
}
return this._diffBlobToBuffer(blob, text, options)
return this.getWorkingDirectory(_path)
.then(wd => {
let relativePath = null
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => {
relativePath = this.relativize(_path, wd)
return repo.getHeadCommit()
})
.then(commit => commit.getEntry(relativePath))
.then(entry => entry.getBlob())
.then(blob => {
const options = new Git.DiffOptions()
options.contextLines = 0
if (process.platform === 'win32') {
// Ignore eol of line differences on windows so that files checked in
// as LF don't report every line modified when the text contains CRLF
// endings.
options.flags = Git.Diff.OPTION.IGNORE_WHITESPACE_EOL
}
return this._diffBlobToBuffer(blob, text, options)
})
})
})
}
@ -691,12 +732,17 @@ export default class GitRepositoryAsync {
// Returns a {Promise} that resolves or rejects depending on whether the
// method was successful.
checkoutHead (_path) {
return Promise.all([this.getRepo(_path), this.getWorkingDirectory(_path)])
.then(([repo, wd]) => {
const checkoutOptions = new Git.CheckoutOptions()
checkoutOptions.paths = [this.relativize(_path, wd)]
checkoutOptions.checkoutStrategy = Git.Checkout.STRATEGY.FORCE | Git.Checkout.STRATEGY.DISABLE_PATHSPEC_MATCH
return Git.Checkout.head(repo, checkoutOptions)
return this.getWorkingDirectory(_path)
.then(wd => {
return this.repoPool.enqueue(() => {
return this.getRepo(_path)
.then(repo => {
const checkoutOptions = new Git.CheckoutOptions()
checkoutOptions.paths = [this.relativize(_path, wd)]
checkoutOptions.checkoutStrategy = Git.Checkout.STRATEGY.FORCE | Git.Checkout.STRATEGY.DISABLE_PATHSPEC_MATCH
return Git.Checkout.head(repo, checkoutOptions)
})
})
})
.then(() => this.refreshStatusForPath(_path))
}
@ -709,17 +755,19 @@ export default class GitRepositoryAsync {
//
// Returns a {Promise} that resolves if the method was successful.
checkoutReference (reference, create) {
return this.getRepo()
.then(repo => repo.checkoutBranch(reference))
.catch(error => {
if (create) {
return this._createBranch(reference)
.then(_ => this.checkoutReference(reference, false))
} else {
throw error
}
})
.then(_ => null)
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => repo.checkoutBranch(reference))
})
.catch(error => {
if (create) {
return this._createBranch(reference)
.then(_ => this.checkoutReference(reference, false))
} else {
throw error
}
})
.then(_ => null)
}
// Private
@ -745,9 +793,11 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to a {NodeGit.Ref} reference to the
// created branch.
_createBranch (name) {
return this.getRepo()
.then(repo => Promise.all([repo, repo.getHeadCommit()]))
.then(([repo, commit]) => repo.createBranch(name, commit))
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => Promise.all([repo, repo.getHeadCommit()]))
.then(([repo, commit]) => repo.createBranch(name, commit))
})
}
// Get all the hunks in the diff.
@ -804,14 +854,16 @@ export default class GitRepositoryAsync {
// Returns a {Promise} which resolves to a {boolean} indicating whether the
// branch name changed.
_refreshBranch () {
return this.getRepo()
.then(repo => repo.getCurrentBranch())
.then(ref => ref.name())
.then(branchName => {
const changed = branchName !== this.branch
this.branch = branchName
return changed
})
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => repo.getCurrentBranch())
.then(ref => ref.name())
.then(branchName => {
const changed = branchName !== this.branch
this.branch = branchName
return changed
})
})
}
// Refresh the cached ahead/behind count with the given branch.
@ -1077,18 +1129,20 @@ export default class GitRepositoryAsync {
//
// Returns a {Promise} which resolves to an {Array} of {NodeGit.StatusFile}
// statuses for the paths.
_getStatus (paths, repo) {
return this.getRepo()
.then(repo => {
const opts = {
flags: Git.Status.OPT.INCLUDE_UNTRACKED | Git.Status.OPT.RECURSE_UNTRACKED_DIRS
}
_getStatus (paths) {
return this.repoPool.enqueue(() => {
return this.getRepo()
.then(repo => {
const opts = {
flags: Git.Status.OPT.INCLUDE_UNTRACKED | Git.Status.OPT.RECURSE_UNTRACKED_DIRS
}
if (paths) {
opts.pathspec = paths
}
if (paths) {
opts.pathspec = paths
}
return repo.getStatusExt(opts)
})
return repo.getStatusExt(opts)
})
})
}
}

57
src/resource-pool.js Normal file
View File

@ -0,0 +1,57 @@
/** @babel */
// Manages a pool of some resource.
export default class ResourcePool {
constructor (pool) {
this.pool = pool
this.queue = []
}
// Enqueue the given function. The function will be given an object from the
// pool. The function must return a {Promise}.
enqueue (fn) {
let resolve = null
let reject = null
const wrapperPromise = new Promise((resolve_, reject_) => {
resolve = resolve_
reject = reject_
})
this.queue.push(this.wrapFunction(fn, resolve, reject))
this.dequeueIfAble()
return wrapperPromise
}
wrapFunction (fn, resolve, reject) {
return (resource) => {
const promise = fn(resource)
promise
.then(result => {
resolve(result)
this.taskDidComplete(resource)
}, error => {
reject(error)
this.taskDidComplete(resource)
})
}
}
taskDidComplete (resource) {
this.pool.push(resource)
this.dequeueIfAble()
}
dequeueIfAble () {
if (!this.pool.length || !this.queue.length) return
const fn = this.queue.shift()
const resource = this.pool.shift()
fn(resource)
}
getQueueDepth () { return this.queue.length }
}