1 # refs.py -- For dealing with git refs
2 # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4 # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5 # General Public License as public by the Free Software Foundation; version 2.0
6 # or (at your option) any later version. You can redistribute it and/or
7 # modify it under the terms of either of these two licenses.
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # You should have received a copy of the licenses; if not, see
16 # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17 # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18 # License, Version 2.0.
29 from dulwich.errors import (
33 from dulwich.objects import (
38 from dulwich.file import (
45 LOCAL_BRANCH_PREFIX = b'refs/heads/'
46 LOCAL_TAG_PREFIX = b'refs/tags/'
47 BAD_REF_CHARS = set(b'\177 ~^:?*[')
48 ANNOTATED_TAG_SUFFIX = b'^{}'
51 def parse_symref_value(contents):
52 """Parse a symref value.
54 :param contents: Contents to parse
57 if contents.startswith(SYMREF):
58 return contents[len(SYMREF):].rstrip(b'\r\n')
59 raise ValueError(contents)
62 def check_ref_format(refname):
63 """Check if a refname is correctly formatted.
65 Implements all the same rules as git-check-ref-format[1].
68 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
70 :param refname: The refname to check
71 :return: True if refname is valid, False otherwise
73 # These could be combined into one big expression, but are listed
74 # separately to parallel [1].
75 if b'/.' in refname or refname.startswith(b'.'):
77 if b'/' not in refname:
81 for i, c in enumerate(refname):
82 if ord(refname[i:i+1]) < 0o40 or c in BAD_REF_CHARS:
84 if refname[-1] in b'/.':
86 if refname.endswith(b'.lock'):
95 class RefsContainer(object):
96 """A container for refs."""
98 def __init__(self, logger=None):
101 def _log(self, ref, old_sha, new_sha, committer=None, timestamp=None,
102 timezone=None, message=None):
103 if self._logger is None:
107 self._logger(ref, old_sha, new_sha, committer, timestamp,
110 def set_symbolic_ref(self, name, other, committer=None, timestamp=None,
111 timezone=None, message=None):
112 """Make a ref point at another ref.
114 :param name: Name of the ref to set
115 :param other: Name of the ref to point at
116 :param message: Optional message
118 raise NotImplementedError(self.set_symbolic_ref)
120 def get_packed_refs(self):
121 """Get contents of the packed-refs file.
123 :return: Dictionary mapping ref names to SHA1s
125 :note: Will return an empty dictionary when no packed-refs file is
128 raise NotImplementedError(self.get_packed_refs)
130 def get_peeled(self, name):
131 """Return the cached peeled value of a ref, if available.
133 :param name: Name of the ref to peel
134 :return: The peeled value of the ref. If the ref is known not point to
135 a tag, this will be the SHA the ref refers to. If the ref may point
136 to a tag, but no cached information is available, None is returned.
140 def import_refs(self, base, other, committer=None, timestamp=None,
141 timezone=None, message=None):
142 for name, value in other.items():
143 self.set_if_equals(b'/'.join((base, name)), None, value,
147 """All refs present in this container."""
148 raise NotImplementedError(self.allkeys)
150 def keys(self, base=None):
151 """Refs present in this container.
153 :param base: An optional base to return refs under.
154 :return: An unsorted set of valid refs in this container, including
158 return self.subkeys(base)
160 return self.allkeys()
162 def subkeys(self, base):
163 """Refs present in this container under a base.
165 :param base: The base to return refs under.
166 :return: A set of valid refs in this container under the base; the base
167 prefix is stripped from the ref names returned.
170 base_len = len(base) + 1
171 for refname in self.allkeys():
172 if refname.startswith(base):
173 keys.add(refname[base_len:])
176 def as_dict(self, base=None):
177 """Return the contents of this container as a dictionary.
181 keys = self.keys(base)
185 base = base.rstrip(b'/')
188 ret[key] = self[(base + b'/' + key).strip(b'/')]
190 continue # Unable to resolve
194 def _check_refname(self, name):
195 """Ensure a refname is valid and lives in refs or is HEAD.
197 HEAD is not a valid refname according to git-check-ref-format, but this
198 class needs to be able to touch HEAD. Also, check_ref_format expects
199 refnames without the leading 'refs/', but this class requires that
200 so it cannot touch anything outside the refs dir (or HEAD).
202 :param name: The name of the reference.
203 :raises KeyError: if a refname is not HEAD or is otherwise not valid.
205 if name in (b'HEAD', b'refs/stash'):
207 if not name.startswith(b'refs/') or not check_ref_format(name[5:]):
208 raise RefFormatError(name)
210 def read_ref(self, refname):
211 """Read a reference without following any references.
213 :param refname: The name of the reference
214 :return: The contents of the ref file, or None if it does
217 contents = self.read_loose_ref(refname)
219 contents = self.get_packed_refs().get(refname, None)
222 def read_loose_ref(self, name):
223 """Read a loose reference and return its contents.
225 :param name: the refname to read
226 :return: The contents of the ref file, or None if it does
229 raise NotImplementedError(self.read_loose_ref)
231 def follow(self, name):
232 """Follow a reference name.
234 :return: a tuple of (refnames, sha), wheres refnames are the names of
235 references in the chain
237 contents = SYMREF + name
240 while contents.startswith(SYMREF):
241 refname = contents[len(SYMREF):]
242 refnames.append(refname)
243 contents = self.read_ref(refname)
249 return refnames, contents
251 def _follow(self, name):
254 "RefsContainer._follow is deprecated. Use RefsContainer.follow "
255 "instead.", DeprecationWarning)
256 refnames, contents = self.follow(name)
258 return (None, contents)
259 return (refnames[-1], contents)
261 def __contains__(self, refname):
262 if self.read_ref(refname):
266 def __getitem__(self, name):
267 """Get the SHA1 for a reference name.
269 This method follows all symbolic references.
271 _, sha = self.follow(name)
276 def set_if_equals(self, name, old_ref, new_ref, committer=None,
277 timestamp=None, timezone=None, message=None):
278 """Set a refname to new_ref only if it currently equals old_ref.
280 This method follows all symbolic references if applicable for the
281 subclass, and can be used to perform an atomic compare-and-swap
284 :param name: The refname to set.
285 :param old_ref: The old sha the refname must refer to, or None to set
287 :param new_ref: The new sha the refname will refer to.
288 :param message: Message for reflog
289 :return: True if the set was successful, False otherwise.
291 raise NotImplementedError(self.set_if_equals)
293 def add_if_new(self, name, ref):
294 """Add a new reference only if it does not already exist.
296 :param name: Ref name
297 :param ref: Ref value
298 :param message: Message for reflog
300 raise NotImplementedError(self.add_if_new)
302 def __setitem__(self, name, ref):
303 """Set a reference name to point to the given SHA1.
305 This method follows all symbolic references if applicable for the
308 :note: This method unconditionally overwrites the contents of a
309 reference. To update atomically only if the reference has not
310 changed, use set_if_equals().
311 :param name: The refname to set.
312 :param ref: The new sha the refname will refer to.
314 self.set_if_equals(name, None, ref)
316 def remove_if_equals(self, name, old_ref, committer=None,
317 timestamp=None, timezone=None, message=None):
318 """Remove a refname only if it currently equals old_ref.
320 This method does not follow symbolic references, even if applicable for
321 the subclass. It can be used to perform an atomic compare-and-delete
324 :param name: The refname to delete.
325 :param old_ref: The old sha the refname must refer to, or None to
326 delete unconditionally.
327 :param message: Message for reflog
328 :return: True if the delete was successful, False otherwise.
330 raise NotImplementedError(self.remove_if_equals)
332 def __delitem__(self, name):
335 This method does not follow symbolic references, even if applicable for
338 :note: This method unconditionally deletes the contents of a reference.
339 To delete atomically only if the reference has not changed, use
342 :param name: The refname to delete.
344 self.remove_if_equals(name, None)
346 def get_symrefs(self):
347 """Get a dict with all symrefs in this container.
349 :return: Dictionary mapping source ref to target ref
352 for src in self.allkeys():
354 dst = parse_symref_value(self.read_ref(src))
362 class DictRefsContainer(RefsContainer):
363 """RefsContainer backed by a simple dict.
365 This container does not support symbolic or packed references and is not
369 def __init__(self, refs, logger=None):
370 super(DictRefsContainer, self).__init__(logger=logger)
375 return self._refs.keys()
377 def read_loose_ref(self, name):
378 return self._refs.get(name, None)
380 def get_packed_refs(self):
383 def set_symbolic_ref(self, name, other, committer=None,
384 timestamp=None, timezone=None, message=None):
385 old = self.follow(name)[-1]
386 self._refs[name] = SYMREF + other
387 self._log(name, old, old, committer=committer, timestamp=timestamp,
388 timezone=timezone, message=message)
390 def set_if_equals(self, name, old_ref, new_ref, committer=None,
391 timestamp=None, timezone=None, message=None):
392 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
394 realnames, _ = self.follow(name)
395 for realname in realnames:
396 self._check_refname(realname)
397 old = self._refs.get(realname)
398 self._refs[realname] = new_ref
399 self._log(realname, old, new_ref, committer=committer,
400 timestamp=timestamp, timezone=timezone, message=message)
403 def add_if_new(self, name, ref, committer=None, timestamp=None,
404 timezone=None, message=None):
405 if name in self._refs:
407 self._refs[name] = ref
408 self._log(name, None, ref, committer=committer, timestamp=timestamp,
409 timezone=timezone, message=message)
412 def remove_if_equals(self, name, old_ref, committer=None, timestamp=None,
413 timezone=None, message=None):
414 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
417 old = self._refs.pop(name)
421 self._log(name, old, None, committer=committer,
422 timestamp=timestamp, timezone=timezone, message=message)
425 def get_peeled(self, name):
426 return self._peeled.get(name)
428 def _update(self, refs):
429 """Update multiple refs; intended only for testing."""
430 # TODO(dborowitz): replace this with a public function that uses
432 self._refs.update(refs)
434 def _update_peeled(self, peeled):
435 """Update cached peeled refs; intended only for testing."""
436 self._peeled.update(peeled)
439 class InfoRefsContainer(RefsContainer):
440 """Refs container that reads refs from a info/refs file."""
442 def __init__(self, f):
445 for l in f.readlines():
446 sha, name = l.rstrip(b'\n').split(b'\t')
447 if name.endswith(ANNOTATED_TAG_SUFFIX):
449 if not check_ref_format(name):
450 raise ValueError("invalid ref name %r" % name)
451 self._peeled[name] = sha
453 if not check_ref_format(name):
454 raise ValueError("invalid ref name %r" % name)
455 self._refs[name] = sha
458 return self._refs.keys()
460 def read_loose_ref(self, name):
461 return self._refs.get(name, None)
463 def get_packed_refs(self):
466 def get_peeled(self, name):
468 return self._peeled[name]
470 return self._refs[name]
473 class DiskRefsContainer(RefsContainer):
474 """Refs container that reads refs from disk."""
476 def __init__(self, path, worktree_path=None, logger=None):
477 super(DiskRefsContainer, self).__init__(logger=logger)
478 if getattr(path, 'encode', None) is not None:
479 path = path.encode(sys.getfilesystemencoding())
481 if worktree_path is None:
483 if getattr(worktree_path, 'encode', None) is not None:
484 worktree_path = worktree_path.encode(sys.getfilesystemencoding())
485 self.worktree_path = worktree_path
486 self._packed_refs = None
487 self._peeled_refs = None
490 return "%s(%r)" % (self.__class__.__name__, self.path)
492 def subkeys(self, base):
494 path = self.refpath(base)
495 for root, unused_dirs, files in os.walk(path):
496 dir = root[len(path):]
497 if os.path.sep != '/':
498 dir = dir.replace(os.path.sep.encode(
499 sys.getfilesystemencoding()), b"/")
500 dir = dir.strip(b'/')
501 for filename in files:
502 refname = b"/".join(([dir] if dir else []) + [filename])
503 # check_ref_format requires at least one /, so we prepend the
504 # base before calling it.
505 if check_ref_format(base + b'/' + refname):
507 for key in self.get_packed_refs():
508 if key.startswith(base):
509 subkeys.add(key[len(base):].strip(b'/'))
514 if os.path.exists(self.refpath(b'HEAD')):
516 path = self.refpath(b'')
517 refspath = self.refpath(b'refs')
518 for root, unused_dirs, files in os.walk(refspath):
519 dir = root[len(path):]
520 if os.path.sep != '/':
522 os.path.sep.encode(sys.getfilesystemencoding()), b"/")
523 for filename in files:
524 refname = b"/".join([dir, filename])
525 if check_ref_format(refname):
527 allkeys.update(self.get_packed_refs())
530 def refpath(self, name):
531 """Return the disk path of a ref.
534 if os.path.sep != "/":
537 os.path.sep.encode(sys.getfilesystemencoding()))
538 # TODO: as the 'HEAD' reference is working tree specific, it
539 # should actually not be a part of RefsContainer
541 return os.path.join(self.worktree_path, name)
543 return os.path.join(self.path, name)
545 def get_packed_refs(self):
546 """Get contents of the packed-refs file.
548 :return: Dictionary mapping ref names to SHA1s
550 :note: Will return an empty dictionary when no packed-refs file is
553 # TODO: invalidate the cache on repacking
554 if self._packed_refs is None:
555 # set both to empty because we want _peeled_refs to be
556 # None if and only if _packed_refs is also None.
557 self._packed_refs = {}
558 self._peeled_refs = {}
559 path = os.path.join(self.path, b'packed-refs')
561 f = GitFile(path, 'rb')
563 if e.errno == errno.ENOENT:
567 first_line = next(iter(f)).rstrip()
568 if (first_line.startswith(b'# pack-refs') and b' peeled' in
570 for sha, name, peeled in read_packed_refs_with_peeled(f):
571 self._packed_refs[name] = sha
573 self._peeled_refs[name] = peeled
576 for sha, name in read_packed_refs(f):
577 self._packed_refs[name] = sha
578 return self._packed_refs
580 def get_peeled(self, name):
581 """Return the cached peeled value of a ref, if available.
583 :param name: Name of the ref to peel
584 :return: The peeled value of the ref. If the ref is known not point to
585 a tag, this will be the SHA the ref refers to. If the ref may point
586 to a tag, but no cached information is available, None is returned.
588 self.get_packed_refs()
589 if self._peeled_refs is None or name not in self._packed_refs:
590 # No cache: no peeled refs were read, or this ref is loose
592 if name in self._peeled_refs:
593 return self._peeled_refs[name]
598 def read_loose_ref(self, name):
599 """Read a reference file and return its contents.
601 If the reference file a symbolic reference, only read the first line of
602 the file. Otherwise, only read the first 40 bytes.
604 :param name: the refname to read, relative to refpath
605 :return: The contents of the ref file, or None if the file does not
607 :raises IOError: if any other error occurs
609 filename = self.refpath(name)
611 with GitFile(filename, 'rb') as f:
612 header = f.read(len(SYMREF))
614 # Read only the first line
615 return header + next(iter(f)).rstrip(b'\r\n')
617 # Read only the first 40 bytes
618 return header + f.read(40 - len(SYMREF))
620 if e.errno in (errno.ENOENT, errno.EISDIR, errno.ENOTDIR):
624 def _remove_packed_ref(self, name):
625 if self._packed_refs is None:
627 filename = os.path.join(self.path, b'packed-refs')
628 # reread cached refs from disk, while holding the lock
629 f = GitFile(filename, 'wb')
631 self._packed_refs = None
632 self.get_packed_refs()
634 if name not in self._packed_refs:
637 del self._packed_refs[name]
638 if name in self._peeled_refs:
639 del self._peeled_refs[name]
640 write_packed_refs(f, self._packed_refs, self._peeled_refs)
645 def set_symbolic_ref(self, name, other, committer=None, timestamp=None,
646 timezone=None, message=None):
647 """Make a ref point at another ref.
649 :param name: Name of the ref to set
650 :param other: Name of the ref to point at
651 :param message: Optional message to describe the change
653 self._check_refname(name)
654 self._check_refname(other)
655 filename = self.refpath(name)
656 f = GitFile(filename, 'wb')
658 f.write(SYMREF + other + b'\n')
659 sha = self.follow(name)[-1]
660 self._log(name, sha, sha, committer=committer,
661 timestamp=timestamp, timezone=timezone,
663 except BaseException:
669 def set_if_equals(self, name, old_ref, new_ref, committer=None,
670 timestamp=None, timezone=None, message=None):
671 """Set a refname to new_ref only if it currently equals old_ref.
673 This method follows all symbolic references, and can be used to perform
674 an atomic compare-and-swap operation.
676 :param name: The refname to set.
677 :param old_ref: The old sha the refname must refer to, or None to set
679 :param new_ref: The new sha the refname will refer to.
680 :param message: Set message for reflog
681 :return: True if the set was successful, False otherwise.
683 self._check_refname(name)
685 realnames, _ = self.follow(name)
686 realname = realnames[-1]
687 except (KeyError, IndexError):
689 filename = self.refpath(realname)
690 ensure_dir_exists(os.path.dirname(filename))
691 with GitFile(filename, 'wb') as f:
692 if old_ref is not None:
694 # read again while holding the lock
695 orig_ref = self.read_loose_ref(realname)
697 orig_ref = self.get_packed_refs().get(
699 if orig_ref != old_ref:
702 except (OSError, IOError):
706 f.write(new_ref + b'\n')
707 except (OSError, IOError):
710 self._log(realname, old_ref, new_ref, committer=committer,
711 timestamp=timestamp, timezone=timezone, message=message)
714 def add_if_new(self, name, ref, committer=None, timestamp=None,
715 timezone=None, message=None):
716 """Add a new reference only if it does not already exist.
718 This method follows symrefs, and only ensures that the last ref in the
719 chain does not exist.
721 :param name: The refname to set.
722 :param ref: The new sha the refname will refer to.
723 :param message: Optional message for reflog
724 :return: True if the add was successful, False otherwise.
727 realnames, contents = self.follow(name)
728 if contents is not None:
730 realname = realnames[-1]
731 except (KeyError, IndexError):
733 self._check_refname(realname)
734 filename = self.refpath(realname)
735 ensure_dir_exists(os.path.dirname(filename))
736 with GitFile(filename, 'wb') as f:
737 if os.path.exists(filename) or name in self.get_packed_refs():
742 except (OSError, IOError):
746 self._log(name, None, ref, committer=committer,
747 timestamp=timestamp, timezone=timezone,
751 def remove_if_equals(self, name, old_ref, committer=None, timestamp=None,
752 timezone=None, message=None):
753 """Remove a refname only if it currently equals old_ref.
755 This method does not follow symbolic references. It can be used to
756 perform an atomic compare-and-delete operation.
758 :param name: The refname to delete.
759 :param old_ref: The old sha the refname must refer to, or None to
760 delete unconditionally.
761 :param message: Optional message
762 :return: True if the delete was successful, False otherwise.
764 self._check_refname(name)
765 filename = self.refpath(name)
766 ensure_dir_exists(os.path.dirname(filename))
767 f = GitFile(filename, 'wb')
769 if old_ref is not None:
770 orig_ref = self.read_loose_ref(name)
772 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
773 if orig_ref != old_ref:
776 # remove the reference file itself
780 if e.errno != errno.ENOENT: # may only be packed
783 self._remove_packed_ref(name)
784 self._log(name, old_ref, None, committer=committer,
785 timestamp=timestamp, timezone=timezone, message=message)
787 # never write, we just wanted the lock
790 # outside of the lock, clean-up any parent directory that might now
791 # be empty. this ensures that re-creating a reference of the same
792 # name of what was previously a directory works as expected
796 parent, _ = parent.rsplit(b'/', 1)
800 parent_filename = self.refpath(parent)
802 os.rmdir(parent_filename)
804 # this can be caused by the parent directory being
805 # removed by another process, being not empty, etc.
806 # in any case, this is non fatal because we already
807 # removed the reference, just ignore it
813 def _split_ref_line(line):
814 """Split a single ref line into a tuple of SHA1 and name."""
815 fields = line.rstrip(b'\n\r').split(b' ')
817 raise PackedRefsException("invalid ref line %r" % line)
819 if not valid_hexsha(sha):
820 raise PackedRefsException("Invalid hex sha %r" % sha)
821 if not check_ref_format(name):
822 raise PackedRefsException("invalid ref name %r" % name)
826 def read_packed_refs(f):
827 """Read a packed refs file.
829 :param f: file-like object to read from
830 :return: Iterator over tuples with SHA1s and ref names.
833 if l.startswith(b'#'):
836 if l.startswith(b'^'):
837 raise PackedRefsException(
838 "found peeled ref in packed-refs without peeled")
839 yield _split_ref_line(l)
842 def read_packed_refs_with_peeled(f):
843 """Read a packed refs file including peeled refs.
845 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
846 with ref names, SHA1s, and peeled SHA1s (or None).
848 :param f: file-like object to read from, seek'ed to the second line
854 line = line.rstrip(b'\r\n')
855 if line.startswith(b'^'):
857 raise PackedRefsException("unexpected peeled ref line")
858 if not valid_hexsha(line[1:]):
859 raise PackedRefsException("Invalid hex sha %r" % line[1:])
860 sha, name = _split_ref_line(last)
862 yield (sha, name, line[1:])
865 sha, name = _split_ref_line(last)
866 yield (sha, name, None)
869 sha, name = _split_ref_line(last)
870 yield (sha, name, None)
873 def write_packed_refs(f, packed_refs, peeled_refs=None):
874 """Write a packed refs file.
876 :param f: empty file-like object to write to
877 :param packed_refs: dict of refname to sha of packed refs to write
878 :param peeled_refs: dict of refname to peeled value of sha
880 if peeled_refs is None:
883 f.write(b'# pack-refs with: peeled\n')
884 for refname in sorted(packed_refs.keys()):
885 f.write(git_line(packed_refs[refname], refname))
886 if refname in peeled_refs:
887 f.write(b'^' + peeled_refs[refname] + b'\n')
890 def read_info_refs(f):
892 for l in f.readlines():
893 (sha, name) = l.rstrip(b"\r\n").split(b"\t", 1)
898 def write_info_refs(refs, store):
899 """Generate info refs."""
900 for name, sha in sorted(refs.items()):
901 # get_refs() includes HEAD as a special case, but we don't want to
909 peeled = store.peel_sha(sha)
910 yield o.id + b'\t' + name + b'\n'
911 if o.id != peeled.id:
912 yield peeled.id + b'\t' + name + ANNOTATED_TAG_SUFFIX + b'\n'
915 def is_local_branch(x):
916 return x.startswith(LOCAL_BRANCH_PREFIX)
919 def strip_peeled_refs(refs):
920 """Remove all peeled refs"""
921 return {ref: sha for (ref, sha) in refs.items()
922 if not ref.endswith(ANNOTATED_TAG_SUFFIX)}