mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 16:57:49 +03:00
f05ee67a9f
Before this patch, "hg sign" of gpg extension executes/evaluates below without acquisition of wlock. - repo.dirstate.parents() - '.hgsigs' not in repo.dirstate It may cause unintentional result, if another command runs parallelly (see also issue4368). To avoid this issue, this patch makes "hg sign" of gpg extension acquire wlock before processing.
313 lines
10 KiB
Python
313 lines
10 KiB
Python
# Copyright 2005, 2006 Benoit Boissinot <benoit.boissinot@ens-lyon.org>
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2 or any later version.
|
|
|
|
'''commands to sign and verify changesets'''
|
|
|
|
import os, tempfile, binascii
|
|
from mercurial import util, commands, match, cmdutil, error
|
|
from mercurial import node as hgnode
|
|
from mercurial.i18n import _
|
|
from mercurial import lock as lockmod
|
|
|
|
cmdtable = {}
|
|
command = cmdutil.command(cmdtable)
|
|
# Note for extension authors: ONLY specify testedwith = 'internal' for
|
|
# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
|
|
# be specifying the version(s) of Mercurial they are tested with, or
|
|
# leave the attribute unspecified.
|
|
testedwith = 'internal'
|
|
|
|
class gpg(object):
|
|
def __init__(self, path, key=None):
|
|
self.path = path
|
|
self.key = (key and " --local-user \"%s\"" % key) or ""
|
|
|
|
def sign(self, data):
|
|
gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
|
|
return util.filter(data, gpgcmd)
|
|
|
|
def verify(self, data, sig):
|
|
""" returns of the good and bad signatures"""
|
|
sigfile = datafile = None
|
|
try:
|
|
# create temporary files
|
|
fd, sigfile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".sig")
|
|
fp = os.fdopen(fd, 'wb')
|
|
fp.write(sig)
|
|
fp.close()
|
|
fd, datafile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".txt")
|
|
fp = os.fdopen(fd, 'wb')
|
|
fp.write(data)
|
|
fp.close()
|
|
gpgcmd = ("%s --logger-fd 1 --status-fd 1 --verify "
|
|
"\"%s\" \"%s\"" % (self.path, sigfile, datafile))
|
|
ret = util.filter("", gpgcmd)
|
|
finally:
|
|
for f in (sigfile, datafile):
|
|
try:
|
|
if f:
|
|
os.unlink(f)
|
|
except OSError:
|
|
pass
|
|
keys = []
|
|
key, fingerprint = None, None
|
|
for l in ret.splitlines():
|
|
# see DETAILS in the gnupg documentation
|
|
# filter the logger output
|
|
if not l.startswith("[GNUPG:]"):
|
|
continue
|
|
l = l[9:]
|
|
if l.startswith("VALIDSIG"):
|
|
# fingerprint of the primary key
|
|
fingerprint = l.split()[10]
|
|
elif l.startswith("ERRSIG"):
|
|
key = l.split(" ", 3)[:2]
|
|
key.append("")
|
|
fingerprint = None
|
|
elif (l.startswith("GOODSIG") or
|
|
l.startswith("EXPSIG") or
|
|
l.startswith("EXPKEYSIG") or
|
|
l.startswith("BADSIG")):
|
|
if key is not None:
|
|
keys.append(key + [fingerprint])
|
|
key = l.split(" ", 2)
|
|
fingerprint = None
|
|
if key is not None:
|
|
keys.append(key + [fingerprint])
|
|
return keys
|
|
|
|
def newgpg(ui, **opts):
|
|
"""create a new gpg instance"""
|
|
gpgpath = ui.config("gpg", "cmd", "gpg")
|
|
gpgkey = opts.get('key')
|
|
if not gpgkey:
|
|
gpgkey = ui.config("gpg", "key", None)
|
|
return gpg(gpgpath, gpgkey)
|
|
|
|
def sigwalk(repo):
|
|
"""
|
|
walk over every sigs, yields a couple
|
|
((node, version, sig), (filename, linenumber))
|
|
"""
|
|
def parsefile(fileiter, context):
|
|
ln = 1
|
|
for l in fileiter:
|
|
if not l:
|
|
continue
|
|
yield (l.split(" ", 2), (context, ln))
|
|
ln += 1
|
|
|
|
# read the heads
|
|
fl = repo.file(".hgsigs")
|
|
for r in reversed(fl.heads()):
|
|
fn = ".hgsigs|%s" % hgnode.short(r)
|
|
for item in parsefile(fl.read(r).splitlines(), fn):
|
|
yield item
|
|
try:
|
|
# read local signatures
|
|
fn = "localsigs"
|
|
for item in parsefile(repo.vfs(fn), fn):
|
|
yield item
|
|
except IOError:
|
|
pass
|
|
|
|
def getkeys(ui, repo, mygpg, sigdata, context):
|
|
"""get the keys who signed a data"""
|
|
fn, ln = context
|
|
node, version, sig = sigdata
|
|
prefix = "%s:%d" % (fn, ln)
|
|
node = hgnode.bin(node)
|
|
|
|
data = node2txt(repo, node, version)
|
|
sig = binascii.a2b_base64(sig)
|
|
keys = mygpg.verify(data, sig)
|
|
|
|
validkeys = []
|
|
# warn for expired key and/or sigs
|
|
for key in keys:
|
|
if key[0] == "ERRSIG":
|
|
ui.write(_("%s Unknown key ID \"%s\"\n")
|
|
% (prefix, shortkey(ui, key[1][:15])))
|
|
continue
|
|
if key[0] == "BADSIG":
|
|
ui.write(_("%s Bad signature from \"%s\"\n") % (prefix, key[2]))
|
|
continue
|
|
if key[0] == "EXPSIG":
|
|
ui.write(_("%s Note: Signature has expired"
|
|
" (signed by: \"%s\")\n") % (prefix, key[2]))
|
|
elif key[0] == "EXPKEYSIG":
|
|
ui.write(_("%s Note: This key has expired"
|
|
" (signed by: \"%s\")\n") % (prefix, key[2]))
|
|
validkeys.append((key[1], key[2], key[3]))
|
|
return validkeys
|
|
|
|
@command("sigs", [], _('hg sigs'))
|
|
def sigs(ui, repo):
|
|
"""list signed changesets"""
|
|
mygpg = newgpg(ui)
|
|
revs = {}
|
|
|
|
for data, context in sigwalk(repo):
|
|
node, version, sig = data
|
|
fn, ln = context
|
|
try:
|
|
n = repo.lookup(node)
|
|
except KeyError:
|
|
ui.warn(_("%s:%d node does not exist\n") % (fn, ln))
|
|
continue
|
|
r = repo.changelog.rev(n)
|
|
keys = getkeys(ui, repo, mygpg, data, context)
|
|
if not keys:
|
|
continue
|
|
revs.setdefault(r, [])
|
|
revs[r].extend(keys)
|
|
for rev in sorted(revs, reverse=True):
|
|
for k in revs[rev]:
|
|
r = "%5d:%s" % (rev, hgnode.hex(repo.changelog.node(rev)))
|
|
ui.write("%-30s %s\n" % (keystr(ui, k), r))
|
|
|
|
@command("sigcheck", [], _('hg sigcheck REV'))
|
|
def sigcheck(ui, repo, rev):
|
|
"""verify all the signatures there may be for a particular revision"""
|
|
mygpg = newgpg(ui)
|
|
rev = repo.lookup(rev)
|
|
hexrev = hgnode.hex(rev)
|
|
keys = []
|
|
|
|
for data, context in sigwalk(repo):
|
|
node, version, sig = data
|
|
if node == hexrev:
|
|
k = getkeys(ui, repo, mygpg, data, context)
|
|
if k:
|
|
keys.extend(k)
|
|
|
|
if not keys:
|
|
ui.write(_("no valid signature for %s\n") % hgnode.short(rev))
|
|
return
|
|
|
|
# print summary
|
|
ui.write("%s is signed by:\n" % hgnode.short(rev))
|
|
for key in keys:
|
|
ui.write(" %s\n" % keystr(ui, key))
|
|
|
|
def keystr(ui, key):
|
|
"""associate a string to a key (username, comment)"""
|
|
keyid, user, fingerprint = key
|
|
comment = ui.config("gpg", fingerprint, None)
|
|
if comment:
|
|
return "%s (%s)" % (user, comment)
|
|
else:
|
|
return user
|
|
|
|
@command("sign",
|
|
[('l', 'local', None, _('make the signature local')),
|
|
('f', 'force', None, _('sign even if the sigfile is modified')),
|
|
('', 'no-commit', None, _('do not commit the sigfile after signing')),
|
|
('k', 'key', '',
|
|
_('the key id to sign with'), _('ID')),
|
|
('m', 'message', '',
|
|
_('use text as commit message'), _('TEXT')),
|
|
('e', 'edit', False, _('invoke editor on commit messages')),
|
|
] + commands.commitopts2,
|
|
_('hg sign [OPTION]... [REV]...'))
|
|
def sign(ui, repo, *revs, **opts):
|
|
"""add a signature for the current or given revision
|
|
|
|
If no revision is given, the parent of the working directory is used,
|
|
or tip if no revision is checked out.
|
|
|
|
The ``gpg.cmd`` config setting can be used to specify the command
|
|
to run. A default key can be specified with ``gpg.key``.
|
|
|
|
See :hg:`help dates` for a list of formats valid for -d/--date.
|
|
"""
|
|
wlock = None
|
|
try:
|
|
wlock = repo.wlock()
|
|
return _dosign(ui, repo, *revs, **opts)
|
|
finally:
|
|
lockmod.release(wlock)
|
|
|
|
def _dosign(ui, repo, *revs, **opts):
|
|
mygpg = newgpg(ui, **opts)
|
|
sigver = "0"
|
|
sigmessage = ""
|
|
|
|
date = opts.get('date')
|
|
if date:
|
|
opts['date'] = util.parsedate(date)
|
|
|
|
if revs:
|
|
nodes = [repo.lookup(n) for n in revs]
|
|
else:
|
|
nodes = [node for node in repo.dirstate.parents()
|
|
if node != hgnode.nullid]
|
|
if len(nodes) > 1:
|
|
raise error.Abort(_('uncommitted merge - please provide a '
|
|
'specific revision'))
|
|
if not nodes:
|
|
nodes = [repo.changelog.tip()]
|
|
|
|
for n in nodes:
|
|
hexnode = hgnode.hex(n)
|
|
ui.write(_("signing %d:%s\n") % (repo.changelog.rev(n),
|
|
hgnode.short(n)))
|
|
# build data
|
|
data = node2txt(repo, n, sigver)
|
|
sig = mygpg.sign(data)
|
|
if not sig:
|
|
raise error.Abort(_("error while signing"))
|
|
sig = binascii.b2a_base64(sig)
|
|
sig = sig.replace("\n", "")
|
|
sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)
|
|
|
|
# write it
|
|
if opts['local']:
|
|
repo.vfs.append("localsigs", sigmessage)
|
|
return
|
|
|
|
if not opts["force"]:
|
|
msigs = match.exact(repo.root, '', ['.hgsigs'])
|
|
if any(repo.status(match=msigs, unknown=True, ignored=True)):
|
|
raise error.Abort(_("working copy of .hgsigs is changed "),
|
|
hint=_("please commit .hgsigs manually"))
|
|
|
|
sigsfile = repo.wfile(".hgsigs", "ab")
|
|
sigsfile.write(sigmessage)
|
|
sigsfile.close()
|
|
|
|
if '.hgsigs' not in repo.dirstate:
|
|
repo[None].add([".hgsigs"])
|
|
|
|
if opts["no_commit"]:
|
|
return
|
|
|
|
message = opts['message']
|
|
if not message:
|
|
# we don't translate commit messages
|
|
message = "\n".join(["Added signature for changeset %s"
|
|
% hgnode.short(n)
|
|
for n in nodes])
|
|
try:
|
|
editor = cmdutil.getcommiteditor(editform='gpg.sign', **opts)
|
|
repo.commit(message, opts['user'], opts['date'], match=msigs,
|
|
editor=editor)
|
|
except ValueError as inst:
|
|
raise error.Abort(str(inst))
|
|
|
|
def shortkey(ui, key):
|
|
if len(key) != 16:
|
|
ui.debug("key ID \"%s\" format error\n" % key)
|
|
return key
|
|
|
|
return key[-8:]
|
|
|
|
def node2txt(repo, node, ver):
|
|
"""map a manifest into some text"""
|
|
if ver == "0":
|
|
return "%s\n" % hgnode.hex(node)
|
|
else:
|
|
raise error.Abort(_("unknown signature version"))
|