CHECK-VERSIONS: Add SVN wire protocol support

This commit is contained in:
Le Philousophe
2020-12-27 21:18:17 +01:00
parent 3830cc62ba
commit 523b3b366a
2 changed files with 510 additions and 7 deletions
+45 -7
View File
@@ -3,6 +3,7 @@ import urllib.parse as urlp
import xml.etree.ElementTree as ET
import checkers
import svn_protocol
def parse_multistatus(baseurl, data):
if data.tag != '{DAV:}multistatus':
@@ -56,18 +57,23 @@ def fetch_props(url, depth, props):
return parse_multistatus(url, data)
def svn_commit(version, *, repository):
def dav_get_version(repository):
objects = fetch_props(repository, 0, ('{DAV:}version-name', ))
# Reply has only 1 item
props = next(iter(objects.values()))
online_version = props['{DAV:}version-name']
return online_version == version, online_version, "repository URL: {0}".format(repository)
return online_version
checkers.register('svn commit', svn_commit)
def svn_get_version(repository):
with svn_protocol.SVNClient(repository) as client:
props = client.get_props('')
online_version = props[b'svn:entry:committed-rev'].decode('utf-8')
def svn_tag(version, *, repository, delimiter='.', **kwargs):
return online_version
def dav_list_tags(repository):
objects = fetch_props(repository, 1, tuple())
urls = objects.keys()
@@ -77,15 +83,47 @@ def svn_tag(version, *, repository, delimiter='.', **kwargs):
basepath = basepath.split('/')
basepath[1:] = filter(None, basepath[1:])
basepath = '/'.join(basepath)
# Unquote, isolate path and remove leading /
# Unquote, isolate path and remove trailing /
# URLs are already normalized by urljoin
paths = (urlp.urlsplit(urlp.unquote(url)).path.rstrip('/') for url in urls)
# Remove basepath (we don't need current directory, only children)
matching_tags = (path.rsplit('/', 1)[1] for path in paths if path != basepath)
names = (path.rsplit('/', 1)[1] for path in paths if path != basepath)
return names
def svn_list_tags(repository):
with svn_protocol.SVNClient(repository) as client:
entries = client.get_entries('', None, tuple())
if entries is None:
raise Exception("Can't list SVN entries for {0}".format(repository))
names = (entry.name.decode('utf-8') for entry in entries)
return names
def svn_commit(version, *, repository):
parts = urlp.urlsplit(repository, allow_fragments=False)
if parts.scheme.lower() == 'svn':
online_version = svn_get_version(repository)
elif parts.scheme.lower() in('http' ,'https'):
online_version = dav_get_version(repository)
else:
raise Exception("Scheme {0} not supported as SVN protocol for {1}".format(parts.scheme.lower(), repository))
return online_version == version, online_version, "repository URL: {0}".format(repository)
checkers.register('svn commit', svn_commit)
def svn_tag(version, *, repository, **kwargs):
parts = urlp.urlsplit(repository, allow_fragments=False)
if parts.scheme.lower() == 'svn':
names = svn_list_tags(repository)
elif parts.scheme.lower() in('http' ,'https'):
names = dav_list_tags(repository)
else:
raise Exception("Scheme {0} not supported as SVN protocol for {1}".format(parts.scheme.lower(), repository))
# Apply user filters, change delimiter and sort
matching_tags = checkers.prepare_versions(matching_tags, **kwargs)
matching_tags = checkers.prepare_versions(names, **kwargs)
if len(matching_tags) == 0:
print("WARNING: no matching tags for {0} with {1}".format(
+465
View File
@@ -0,0 +1,465 @@
import base64
import collections
import hashlib
import socket
import urllib.parse as urlp
__all__ = ['SVNClient']
class SVNProtocolError(Exception):
pass
class SVNServerError(SVNProtocolError):
def __init__(self, msg):
super().__init__('Server replied with error: {0!r}'.format(msg))
self.server_error = msg
class SVNString(bytes):
pass
DirEntry = collections.namedtuple('DirEntry', ['name', 'kind', 'size', 'has_props', 'created_rev', 'created_date', 'last_author'])
class SVNClient:
def __init__(self, url):
parts = urlp.urlsplit(url, allow_fragments=False)
if parts.scheme.lower() != 'svn':
raise Exception("SVN is the only scheme supported for pure SVN protocol for {0}".format(url))
self.url = url
port = parts.port
if port is None:
port = 3690
self.server = (parts.hostname, port)
self.s = None
def connect(self):
if self.s:
return
self.s = socket.create_connection(self.server)
caps = greeting(self.s, self.url)
result, extra = auth(self.s)
if not result:
self.s.close()
self.s = None
raise SVNServerError('Authentication failed: {0}'.format(extra.decode('utf-8')))
self.auth_token = extra
uuid, repo_url, repo_caps = repos_info(self.s)
self.uuid = uuid
self.repo_url = repo_url
self.caps = caps + repo_caps
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.s.close()
self.s = None
return False
def get_latest_rev(self):
return get_latest_rev(self.s)
def check_path(self, path, rev = None):
return check_path(self.s, path, rev)
def get_dir(self, path, rev = None, want_props=False, want_contents=False, dirent_fields=None):
return get_dir(self.s, path, rev, want_props, want_contents, dirent_fields)
def get_file(s, path, rev, want_props=False, want_contents=False):
return get_file(self.s, path, rev, want_props, want_contents)
def get_props(self, path, rev=None):
kind = self.check_path(path, rev)
if kind == b'dir':
revision, props, entries = self.get_dir(path, rev, True, False, None)
elif kind == b'file':
checksum, revision, props, content = self.get_file(path, rev, True, False)
else:
props = None
return props
def get_entries(self, path, rev=None, dirent_fields=None):
kind = self.check_path(path, rev)
if kind == b'dir':
revision, props, entries = self.get_dir(path, rev, False, True, dirent_fields)
else:
entries = None
return entries
# TOKENS
OPENING_PAREN = object()
CLOSING_PAREN = object()
SPACES = b' \n'
BOOLEANS = [b'false', b'true']
KINDS = [b'none', b'file', b'dir', b'unknown']
def build_reply(obj):
if type(obj) is SVNString:
return b'%d:%b ' % (len(obj), obj)
elif type(obj) is str:
return b'%d:%b ' % (len(obj), obj.encode('ascii'))
elif type(obj) is bytes:
return b'%b ' % (obj, )
elif type(obj) is int:
return b'%d ' % (obj, )
elif type(obj) is bool:
return b'true ' if obj else b'false '
elif type(obj) is tuple:
return b'( %b) ' % (b''.join(
build_reply(o) for o in obj), )
else:
raise Exception("Invalid object type {0!s} when building svn reply", type(obj))
def readit(s, sz=1):
while True:
yield s.recv(sz)
def read_string(s, length):
# Read ending space too
length += 1
data = bytearray(length)
view = memoryview(data)
while len(view) > 0:
rln = s.recv_into(view)
if rln == 0:
raise SVNProtocolError('Unexpected end of stream while reading a string of length {0}'.format(length))
view = view[rln:]
if data[-1:] not in SPACES:
raise SVNProtocolError('Unexpected character at end of string: {0!r}'.format(data[-1:]))
return SVNString(data[:-1])
def read_tokens(s):
buf = None
for b in readit(s):
if not b:
# End of stream
return
elif b in SPACES:
if buf is not None:
yield buf
buf = None
continue
elif b == b':':
if type(buf) is not int:
raise SVNProtocolError('Unexpected colon without digits before: {0!r}'.format(buf))
yield read_string(s, buf)
buf = None
elif b == b')':
if buf is not None:
raise SVNProtocolError('Unexpected closing parenthesis after digits or alpha: {0!r}'.format(buf))
buf = s.recv(1)
if buf not in SPACES:
raise SVNProtocolError('Missing space after closing parenthesis: {0!r}'.format(buf))
yield CLOSING_PAREN
buf = None
elif b == b'(':
if buf is not None:
raise SVNProtocolError('Unexpected opening parenthesis after digits or alpha: {0!r}'.format(buf))
buf = s.recv(1)
if buf not in SPACES:
raise SVNProtocolError('Missing space after closing parenthesis: {0!r}'.format(buf))
yield OPENING_PAREN
buf = None
elif b.isdigit():
# Accumulate to buffer
d = int(b, 10)
if buf is None:
buf = d
elif type(buf) is int:
buf = buf * 10 + d
else:
buf += b
elif b.isalpha():
if buf is None:
buf = b
elif type(buf) is int:
raise SVNProtocolError('Unexpected alpha {1!r} after digits: {0!r}'.format(buf, b))
else:
buf += b
elif b == b'-':
if buf is None:
raise SVNProtocolError('Unexpected dash out of nowhere')
elif type(buf) is int:
raise SVNProtocolError('Unexpected dash after digits: {0!r}'.format(buf, b))
else:
buf += b
def read_tuple(s, got_open=False):
it = read_tokens(s)
if not got_open:
try:
token = next(it)
except StopIteration:
raise SVNProtocolError('Unexpected end of stream') from None
if token is not OPENING_PAREN:
raise SVNProtocolError('An opening parenthesis was expected, got: {0!r}'.format(token))
items = list()
for token in it:
if token is CLOSING_PAREN:
return tuple(items)
elif token is OPENING_PAREN:
items.append(read_tuple(s, True))
else:
items.append(token)
raise SVNProtocolError('Unexpected end of stream, closing parenthesis expected')
def read_response(s):
l = read_tuple(s)
if len(l) != 2:
raise SVNProtocolError('Unexpected response length, got: {0!r}'.format(l))
if l[0] == b'success':
return l[1]
if l[0] == b'failure':
raise SVNServerError(l[1])
raise SVNProtocolError('Unexpected response result, got: {0!r}'.format(l[0]))
def parse_dirent(dirent):
assert(type(dirent) is tuple)
assert(len(dirent) == 7)
assert(type(dirent[0]) is SVNString)
assert(type(dirent[1]) is bytes and dirent[1] in KINDS)
assert(type(dirent[2]) is int)
assert(type(dirent[3]) is bytes and dirent[3] in BOOLEANS)
assert(type(dirent[4]) is int)
assert(type(dirent[5]) is tuple)
assert(len(dirent[5]) == 0 or (len(dirent[5]) == 1 and type(dirent[5][0]) is SVNString))
assert(type(dirent[6]) is tuple)
assert(len(dirent[6]) == 0 or (len(dirent[6]) == 1 and type(dirent[6][0]) is SVNString))
name = dirent[0]
kind = dirent[1]
size = dirent[2]
has_props = dirent[3] == b'true'
created_rev = dirent[4]
created_date = None if len(dirent[5]) == 0 else dirent[5][0]
last_author = None if len(dirent[6]) == 0 else dirent[6][0]
return DirEntry(name, kind, size, has_props, created_rev, created_date, last_author)
def greeting(s, url):
server_greeting = read_response(s)
assert(type(server_greeting) is tuple)
assert(len(server_greeting) == 4)
assert(type(server_greeting[0]) is int)
assert(type(server_greeting[1]) is int)
assert(type(server_greeting[2]) is tuple)
assert(type(server_greeting[3]) is tuple)
assert(all(type(i) is bytes for i in server_greeting[3]))
ver_min, ver_max, mechs, caps = server_greeting
if 2 not in range(ver_min, ver_max+1):
raise SVNProtocolError('Version {0}-{1} unsupported, wanted {2}'.format(ver_min, ver_max, 2))
client_greeting = build_reply((2, (b'edit-pipeline', b'svndiff1', b'accepts-svndiff2', b'absent-entries'), url, 'check-versions/1.0', tuple()))
s.send(client_greeting)
return caps
def auth(s):
auth_request = read_response(s)
assert(type(auth_request) is tuple)
assert(len(auth_request) == 2)
assert(type(auth_request[0]) is tuple)
assert(all(type(i) is bytes for i in auth_request[0]))
assert(type(auth_request[1]) is SVNString)
mechs, realm = auth_request
if b'ANONYMOUS' not in mechs:
raise SVNProtocolError('Only ANONYMOUS auth mechanism supported: {0!r}'.format(mechs))
auth_reply = build_reply((b'ANONYMOUS', (SVNString(base64.b64encode(b'anonymous@anonymous')), )))
s.send(auth_reply)
# Not a response but only a tuple
auth_result = read_tuple(s)
assert(type(auth_result) is tuple)
assert(len(auth_result) == 2)
assert(type(auth_result[0]) is bytes)
assert(type(auth_result[1]) is tuple)
if auth_result[0] == b'success':
assert(len(auth_result[1]) == 0 or (len(auth_result[1]) == 1 and
type(auth_result[1][0]) is SVNString))
token = auth_result[1][0] if len(auth_result[1]) else None
return True, token
elif auth_result[0] == b'failure':
assert(len(auth_result[1]) == 1)
assert(type(auth_result[1][0]) is SVNString)
return False, auth_result[1][0]
elif auth_result[0] == b'step':
raise SVNProtocolError('Step in authentication unsupported')
else:
raise SVNProtocolError('Got unknown authentication result: {0!r}'.format(auth_result[0]))
def repos_info(s):
infos = read_response(s)
assert(type(infos) is tuple)
assert(len(infos) == 3)
assert(type(infos[0]) is SVNString)
assert(type(infos[1]) is SVNString)
assert(type(infos[2]) is tuple)
assert(all(type(i) is bytes for i in infos[2]))
uuid, repo_url, caps = infos
return uuid, repo_url, caps
def send_command(s, command):
req = build_reply(command)
s.send(req)
auth_request = read_response(s)
assert(type(auth_request) is tuple)
assert(len(auth_request) == 2)
assert(type(auth_request[0]) is tuple)
assert(all(type(i) is bytes for i in auth_request[0]))
assert(type(auth_request[1]) is SVNString)
mechs, realm = auth_request
if len(mechs):
raise SVNProtocolError('Reauthentication not supported')
return read_response(s)
def get_latest_rev(s):
result = send_command(s, (b'get-latest-rev', tuple()))
assert(type(result) is tuple)
assert(len(result) == 1)
assert(type(result[0]) is int)
return result[0]
def check_path(s, path, rev = None):
if rev is None:
rev = tuple()
else:
rev = (rev, )
result = send_command(s, (b'check-path', (path, rev)))
assert(type(result) is tuple)
assert(len(result) == 1)
assert(type(result[0]) is bytes)
return result[0]
def get_dir(s, path, rev = None, want_props=False, want_contents=False, dirent_fields=None):
if rev is None:
rev = tuple()
else:
rev = (rev, )
if dirent_fields is None:
if want_contents:
dirent_fields = (b'kind', b'size', b'has-props', b'created-rev',
b'time', b'last-author',b'word')
else:
dirent_fields = tuple()
# We don't send want_iprops and don't expect a reply with it
result = send_command(s, (b'get-dir', (path, rev, want_props, want_contents, dirent_fields)))
assert(type(result) is tuple)
assert(len(result) >= 3)
assert(type(result[0]) is int)
assert(type(result[1]) is tuple)
assert(type(result[2]) is tuple)
revision = result[0]
props = None
if want_props:
assert(all(
type(p) is tuple and
len(p) == 2 and
type(p[0]) is SVNString and
type(p[1]) is SVNString for p in result[1]))
props = dict(result[1])
entries = None
if want_contents:
assert(all(type(entry) is tuple for entry in result[2]))
entries = list(parse_dirent(entry)
for entry in result[2])
return revision, props, entries
def get_file(s, path, rev, want_props=False, want_contents=False):
if rev is None:
rev = tuple()
else:
rev = (rev, )
# We don't send want_iprops and don't expect a reply with it
result = send_command(s, (b'get-file', (path, rev, want_props, want_contents)))
assert(type(result) is tuple)
assert(len(result) == 3)
assert(type(result[0]) is tuple)
assert(len(result[0]) == 0 or (len(result[0]) == 1 and type(result[0][0]) is SVNString))
assert(type(result[1]) is int)
assert(type(result[2]) is tuple)
checksum = result[0][0].decode('ascii') if len(result[0]) else None
revision = result[1]
props = None
if want_props:
assert(all(
type(p) is tuple and
len(p) == 2 and
type(p[0]) is SVNString and
type(p[1]) is SVNString for p in result[2]))
props = dict(result[2])
content = None
if want_contents:
md5 = hashlib.md5()
content = b''
for token in read_tokens(s):
if type(token) is not SVNString:
raise SVNProtocolError('Unexpected token when receiving contents: {0!r}'.format(token))
if len(token) == 0:
break
content += token
md5.update(token)
else:
raise SVNProtocolError('Unexpected end of stream when receiving contents')
result = read_response(s)
assert(type(result) is tuple and len(result) == 0)
digest = md5.hexdigest()
if checksum and digest != checksum:
raise SVNProtocolError('Invalid checksum while fetching contents: {0} (expected {1})'.format(digest, checksum))
return checksum, revision, props, content
def test():
server = ("svn.riscos.info", 3690)
url = 'svn://svn.riscos.info/gccsdk/tags/'
path = ''
s = socket.create_connection(server)
print(greeting(s, url))
print(auth(s))
print(repos_info(s))
rev = get_latest_rev(s)
print(rev)
kind = check_path(s, path, rev)
print(kind)
if kind == b'dir':
print(get_dir(s, path, rev, True, True))
elif kind == b'file':
print(get_file(s, path, rev, True, True))
if __name__ == '__main__':
test()