marks = {}
file_changes = []
- for path, (node_id, change_type, text_changed, prop_changed) in changes.iteritems():
+ for path, (node_id, change_type, text_changed, prop_changed) in changes.items():
if root.is_dir(path):
continue
if not MATCHER.matches(path):
first_rev = 1
if final_rev is None:
final_rev = fs_obj.youngest_revision()
- for rev in xrange(first_rev, final_rev + 1):
+ for rev in range(first_rev, final_rev + 1):
export_revision(rev, fs_obj)
# Demonstrates how to do a new commit using Subvertpy
import os
-from cStringIO import StringIO
+from io import BytesIO
from subvertpy import delta, repos
from subvertpy.ra import RemoteAccess, Auth, get_username_provider
file.change_prop("svn:executable", "*")
# Obtain a textdelta handler and send the new file contents
txdelta = file.apply_textdelta()
-delta.send_stream(StringIO("new file contents"), txdelta)
+delta.send_stream(BytesIO(b"new file contents"), txdelta)
file.close()
root.close()
editor.close()
if len(x) <= 1:
raise NeedMoreData("Missing whitespace")
- if not x[1:2] in whitespace:
+ if not x[1] in whitespace:
raise MarshallError("Expected space, got '%c'" % x[1])
return (x[2:], ret)
num = bytearray()
# Check if this is a string or a number
while x[:1].isdigit():
- num.append(x[0:1])
+ num.append(x[0])
x = x[1:]
num = int(num)
- if x[0:1] in whitespace:
+ if x[0] in whitespace:
return (x[1:], num)
elif x[0:1] == b":":
if len(x) < num:
raise NeedMoreData("Expected string of length %r" % num)
return (x[num+2:], x[1:num+1])
+ elif not x:
+ raise MarshallError("Expected whitespace, got end of string.")
else:
raise MarshallError("Expected whitespace or ':', got '%c'" % x[0])
elif x[:1].isalpha():
if not x:
raise MarshallError("Expected whitespace, got end of string.")
- if not x[0:1] in whitespace:
+ if not x[0] in whitespace:
raise MarshallError("Expected whitespace, got '%c'" % x[0])
return (x[1:], ret.decode("ascii"))
else:
return "%d-%d%s" % (start, end, suffix)
text = ""
- for (path, ranges) in merges.iteritems():
+ for (path, ranges) in merges.items():
assert path.startswith("/")
text += "%s:%s\n" % (path, ",".join(map(formatrange, ranges)))
return text
with the old and the new property value.
"""
ret = {}
- for key, newval in current.iteritems():
+ for key, newval in current.items():
oldval = previous.get(key)
if oldval != newval:
ret[key] = (oldval, newval)
if recursive:
return ret
else:
- return ret.values()[0]
+ return next(iter(ret.values()))
def client_get_revprop(self, url, revnum, name):
"""Get the revision property.
:param files: Dictionary with filenames as keys, contents as
values. None as value indicates a directory.
"""
- for name, content in files.iteritems():
+ for name, content in files.items():
if content is None:
try:
os.makedirs(name)
from datetime import datetime, timedelta
import os
-from StringIO import StringIO
+from io import BytesIO
import shutil
import tempfile
from unittest import SkipTest
auth=ra.Auth([ra.get_username_provider()]))
dc = self.get_commit_editor(self.repos_url)
f = dc.add_file("foo")
- f.modify("foo1")
+ f.modify(b"foo1")
dc.close()
dc = self.get_commit_editor(self.repos_url)
f = dc.open_file("foo")
- f.modify("foo2")
+ f.modify(b"foo2")
dc.close()
if client.api_version() < (1, 5):
self.assertEqual(b"", errf.read())
def assertCatEquals(self, value, revision=None):
- io = StringIO()
+ io = BytesIO()
self.client.cat("dc/foo", io, revision)
self.assertEqual(value, io.getvalue())
self.client.log_msg_func = lambda c: "Commit"
self.client.commit(["dc"])
info = self.client.info("dc/foo")
- self.assertEqual(["foo"], info.keys())
+ self.assertEqual(["foo"], list(info.keys()))
self.assertEqual(1, info["foo"].revision)
self.assertEqual(3, info["foo"].size)
- self.build_tree({"dc/bar": "blablabla"})
+ self.build_tree({"dc/bar": b"blablabla"})
self.client.add(os.path.abspath("dc/bar"))
def test_info_nonexistant(self):
self.windows.append(window)
def test_send_stream(self):
- stream = BytesIO("foo")
+ stream = BytesIO(b"foo")
send_stream(stream, self.storing_window_handler)
self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None],
self.windows)
self.assertEqual((b'', b"bla l"), unmarshall(b"5:bla l"))
def test_unmarshall_list(self):
- self.assertEqual((b'', [4,5]), unmarshall(b"( 4 5 ) "))
+ self.assertEqual((b'', [4, 5]), unmarshall(b"( 4 5 ) "))
def test_unmarshall_int(self):
self.assertEqual((b'', 2), unmarshall(b"2 "))
"""Subversion ra library tests."""
-from cStringIO import StringIO
+from io import BytesIO
from subvertpy import (
NODE_DIR, NODE_NONE, NODE_UNKNOWN,
(paths, revnum, props, has_children) = returned[0]
self.assertEqual(None, paths)
self.assertEqual(revnum, 0)
- self.assertEqual(["svn:date"], props.keys())
+ self.assertEqual(["svn:date"], list(props.keys()))
if len(returned[1]) == 3:
(paths, revnum, props) = returned[1]
else:
(paths, revnum, props, has_children) = returned[0]
self.assertEqual(None, paths)
self.assertEqual(revnum, 0)
- self.assertEqual(["svn:date"], props.keys())
+ self.assertEqual(["svn:date"], list(props.keys()))
if len(returned[1]) == 3:
(paths, revnum, props) = returned[1]
else:
f.change_prop("bla:bar", None)
cb.close()
- stream = StringIO()
+ stream = BytesIO()
props = self.ra.get_file("bar", stream, 1)[1]
self.assertEqual("blie", props.get("bla:bar"))
- stream = StringIO()
+ stream = BytesIO()
props = self.ra.get_file("bar", stream, 2)[1]
self.assertIs(None, props.get("bla:bar"))
cb.add_file("bar").modify(b"a")
cb.close()
- stream = StringIO()
+ stream = BytesIO()
self.ra.get_file("bar", stream, 1)
stream.seek(0)
self.assertEqual(b"a", stream.read())
- stream = StringIO()
+ stream = BytesIO()
self.ra.get_file("/bar", stream, 1)
stream.seek(0)
self.assertEqual(b"a", stream.read())
"""Subversion repository library tests."""
-from cStringIO import StringIO
+from io import BytesIO
import os
import textwrap
def test_verify_fs(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
- f = StringIO()
+ f = BytesIO()
r.verify_fs(f, 0, 0)
self.assertEqual(b'* Verified revision 0.\n', f.getvalue())
def test_load_fs_invalid(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
- dumpfile = "Malformed"
- feedback = StringIO()
- self.assertRaises(SubversionException, r.load_fs, StringIO(dumpfile),
+ dumpfile = b"Malformed"
+ feedback = BytesIO()
+ self.assertRaises(SubversionException, r.load_fs, BytesIO(dumpfile),
feedback, repos.LOAD_UUID_DEFAULT)
def test_load_fs(self):
V 27
2011-08-26T13:08:30.187858Z
PROPS-END
- """)
- feedback = StringIO()
- r.load_fs(StringIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT)
+ """).encode("ascii")
+ feedback = BytesIO()
+ r.load_fs(BytesIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT)
self.assertEqual(r.fs().get_uuid(), "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca")
def test_rev_props(self):
repos.create(os.path.join(self.test_dir, "foo"))
- self.assertEqual(["svn:date"], repos.Repository("foo").fs().revision_proplist(0).keys())
+ self.assertEqual(["svn:date"], list(repos.Repository("foo").fs().revision_proplist(0).keys()))
def test_rev_root_invalid(self):
repos.create(os.path.join(self.test_dir, "foo"))
"""Subversion ra library tests."""
-from StringIO import StringIO
+from io import BytesIO
import os
from unittest import SkipTest
def test_add_repos_file(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
- adm.add_repos_file("checkout/bar", StringIO("basecontents"), StringIO("contents"), {}, {})
+ adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"), BytesIO(b"contents"), {}, {})
self.assertEqual(b"basecontents", wc.get_pristine_contents("checkout/bar").read())
def test_mark_missing_deleted(self):