Files
git-dumper/git_dumper.py
2026-02-28 17:06:25 +01:00

821 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
from contextlib import closing
import argparse
import multiprocessing
import os
import os.path
import re
import socket
import subprocess
import sys
import traceback
import urllib.parse
import urllib3
import bs4
import dulwich.index
import dulwich.objects
import dulwich.pack
import requests
import socks
from requests_pkcs12 import Pkcs12Adapter
def printf(fmt, *args, file=sys.stdout):
if args:
fmt = fmt % args
file.write(fmt)
file.flush()
def is_html(response):
""" Return True if the response is a HTML webpage """
return (
"Content-Type" in response.headers
and "text/html" in response.headers["Content-Type"]
)
def is_safe_path(path):
""" Prevent directory traversal attacks """
if path.startswith("/"):
return False
safe_path = os.path.expanduser("~")
return (
os.path.commonpath(
(os.path.realpath(os.path.join(safe_path, path)), safe_path)
)
== safe_path
)
def get_indexed_files(response):
""" Return all the files in the directory index webpage """
html = bs4.BeautifulSoup(response.text, "html.parser")
files = []
for link in html.find_all("a"):
url = urllib.parse.urlparse(link.get("href"))
if (
url.path
and is_safe_path(url.path)
and not url.scheme
and not url.netloc
):
files.append(url.path)
return files
def verify_response(response):
if response.status_code != 200:
return (
False,
"[-] %s/%s responded with status code {code}\n".format(
code=response.status_code
),
)
elif (
"Content-Length" in response.headers
and response.headers["Content-Length"] == 0
):
return False, "[-] %s/%s responded with a zero-length body\n"
elif (
"Content-Type" in response.headers
and "text/html" in response.headers["Content-Type"]
):
return False, "[-] %s/%s responded with HTML\n"
else:
return True, True
def create_intermediate_dirs(path):
""" Create intermediate directories, if necessary """
dirname, basename = os.path.split(path)
if dirname and not os.path.exists(dirname):
try:
os.makedirs(dirname)
except FileExistsError:
pass # race condition
def get_referenced_sha1(obj_file):
""" Return all the referenced SHA1 in the given object file """
objs = []
if isinstance(obj_file, dulwich.objects.Commit):
objs.append(obj_file.tree.decode())
for parent in obj_file.parents:
objs.append(parent.decode())
elif isinstance(obj_file, dulwich.objects.Tree):
for item in obj_file.iteritems():
objs.append(item.sha.decode())
elif isinstance(obj_file, dulwich.objects.Blob):
pass
elif isinstance(obj_file, dulwich.objects.Tag):
pass
else:
printf(
"error: unexpected object type: %r\n" % obj_file, file=sys.stderr
)
sys.exit(1)
return objs
class Worker(multiprocessing.Process):
""" Worker for process_tasks """
def __init__(self, pending_tasks, tasks_done, args):
super().__init__()
self.daemon = True
self.pending_tasks = pending_tasks
self.tasks_done = tasks_done
self.args = args
def run(self):
# initialize process
self.init(*self.args)
# fetch and do tasks
while True:
task = self.pending_tasks.get(block=True)
if task is None: # end signal
return
try:
result = self.do_task(task, *self.args)
except Exception:
printf("Task %s raised exception:\n", task, file=sys.stderr)
traceback.print_exc()
result = []
assert isinstance(
result, list
), "do_task() should return a list of tasks"
self.tasks_done.put(result)
def init(self, *args):
raise NotImplementedError
def do_task(self, task, *args):
raise NotImplementedError
def process_tasks(initial_tasks, worker, jobs, args=(), tasks_done=None):
""" Process tasks in parallel """
if not initial_tasks:
return
tasks_seen = set(tasks_done) if tasks_done else set()
pending_tasks = multiprocessing.Queue()
tasks_done = multiprocessing.Queue()
num_pending_tasks = 0
# add all initial tasks in the queue
for task in initial_tasks:
assert task is not None
if task not in tasks_seen:
pending_tasks.put(task)
num_pending_tasks += 1
tasks_seen.add(task)
# initialize processes
processes = [worker(pending_tasks, tasks_done, args) for _ in range(jobs)]
# launch them all
for p in processes:
p.start()
# collect task results
while num_pending_tasks > 0:
task_result = tasks_done.get(block=True)
num_pending_tasks -= 1
for task in task_result:
assert task is not None
if task not in tasks_seen:
pending_tasks.put(task)
num_pending_tasks += 1
tasks_seen.add(task)
# send termination signal (task=None)
for _ in range(jobs):
pending_tasks.put(None)
# join all
for p in processes:
p.join()
class DownloadWorker(Worker):
""" Download a list of files """
def init(self, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
self.session = requests.Session()
self.session.verify = False
self.session.headers = http_headers
if client_cert_p12:
self.session.mount(url, Pkcs12Adapter(pkcs12_filename=client_cert_p12, pkcs12_password=client_cert_p12_password))
else:
self.session.mount(url, requests.adapters.HTTPAdapter(max_retries=retry))
def do_task(self, filepath, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
if os.path.isfile(os.path.join(directory, filepath)):
printf("[-] Already downloaded %s/%s\n", url, filepath)
return []
with closing(
self.session.get(
"%s/%s" % (url, filepath),
allow_redirects=False,
stream=True,
timeout=timeout,
)
) as response:
printf(
"[-] Fetching %s/%s [%d]\n",
url,
filepath,
response.status_code,
)
valid, error_message = verify_response(response)
if not valid:
printf(error_message, url, filepath, file=sys.stderr)
return []
abspath = os.path.abspath(os.path.join(directory, filepath))
create_intermediate_dirs(abspath)
# write file
with open(abspath, "wb") as f:
for chunk in response.iter_content(4096):
f.write(chunk)
return []
class RecursiveDownloadWorker(DownloadWorker):
""" Download a directory recursively """
def do_task(self, filepath, url, directory, retry, timeout, http_headers):
if os.path.isfile(os.path.join(directory, filepath)):
printf("[-] Already downloaded %s/%s\n", url, filepath)
return []
with closing(
self.session.get(
"%s/%s" % (url, filepath),
allow_redirects=False,
stream=True,
timeout=timeout,
)
) as response:
printf(
"[-] Fetching %s/%s [%d]\n",
url,
filepath,
response.status_code,
)
if (
response.status_code in (301, 302)
and "Location" in response.headers
and response.headers["Location"].endswith(filepath + "/")
):
return [filepath + "/"]
if filepath.endswith("/"): # directory index
assert is_html(response)
return [
filepath + filename
for filename in get_indexed_files(response)
]
else: # file
valid, error_message = verify_response(response)
if not valid:
printf(error_message, url, filepath, file=sys.stderr)
return []
abspath = os.path.abspath(os.path.join(directory, filepath))
create_intermediate_dirs(abspath)
# write file
with open(abspath, "wb") as f:
for chunk in response.iter_content(4096):
f.write(chunk)
return []
class FindRefsWorker(DownloadWorker):
""" Find refs/ """
def do_task(self, filepath, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
response = self.session.get(
"%s/%s" % (url, filepath), allow_redirects=False, timeout=timeout
)
printf(
"[-] Fetching %s/%s [%d]\n", url, filepath, response.status_code
)
valid, error_message = verify_response(response)
if not valid:
printf(error_message, url, filepath, file=sys.stderr)
return []
abspath = os.path.abspath(os.path.join(directory, filepath))
create_intermediate_dirs(abspath)
# write file
with open(abspath, "w") as f:
f.write(response.text)
# find refs
tasks = []
for ref in re.findall(
r"(refs(/[a-zA-Z0-9\-\.\_\*]+)+)", response.text
):
ref = ref[0]
if not ref.endswith("*") and is_safe_path(ref):
tasks.append(".git/%s" % ref)
tasks.append(".git/logs/%s" % ref)
return tasks
class FindObjectsWorker(DownloadWorker):
""" Find objects """
def do_task(self, obj, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
filepath = ".git/objects/%s/%s" % (obj[:2], obj[2:])
if os.path.isfile(os.path.join(directory, filepath)):
printf("[-] Already downloaded %s/%s\n", url, filepath)
else:
response = self.session.get(
"%s/%s" % (url, filepath),
allow_redirects=False,
timeout=timeout,
)
printf(
"[-] Fetching %s/%s [%d]\n",
url,
filepath,
response.status_code,
)
valid, error_message = verify_response(response)
if not valid:
printf(error_message, url, filepath, file=sys.stderr)
return []
abspath = os.path.abspath(os.path.join(directory, filepath))
create_intermediate_dirs(abspath)
# write file
with open(abspath, "wb") as f:
f.write(response.content)
abspath = os.path.abspath(os.path.join(directory, filepath))
# parse object file to find other objects
obj_file = dulwich.objects.ShaFile.from_path(abspath)
return get_referenced_sha1(obj_file)
def sanitize_file(filepath):
""" Inplace comment out possibly unsafe lines based on regex """
assert os.path.isfile(filepath), "%s is not a file" % filepath
UNSAFE=r"^\s*fsmonitor|sshcommand|askpass|editor|pager"
with open(filepath, 'r+') as f:
content = f.read()
modified_content = re.sub(UNSAFE, r'# \g<0>', content, flags=re.IGNORECASE)
if content != modified_content:
printf("Warning: '%s' file was altered\n" % filepath)
f.seek(0)
f.write(modified_content)
def fetch_git(url, directory, jobs, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
""" Dump a git repository into the output directory """
assert os.path.isdir(directory), "%s is not a directory" % directory
assert jobs >= 1, "invalid number of jobs"
assert retry >= 1, "invalid number of retries"
assert timeout >= 1, "invalid timeout"
session = requests.Session()
session.verify = False
session.headers = http_headers
if client_cert_p12:
session.mount(url, Pkcs12Adapter(pkcs12_filename=client_cert_p12, pkcs12_password=client_cert_p12_password))
else:
session.mount(url, requests.adapters.HTTPAdapter(max_retries=retry))
if os.listdir(directory):
printf("Warning: Destination '%s' is not empty\n", directory)
# find base url
url = url.rstrip("/")
if url.endswith("HEAD"):
url = url[:-4]
url = url.rstrip("/")
if url.endswith(".git"):
url = url[:-4]
url = url.rstrip("/")
# check for /.git/HEAD
printf("[-] Testing %s/.git/HEAD ", url)
response = session.get(
"%s/.git/HEAD" % url,
timeout=timeout,
allow_redirects=False
)
printf("[%d]\n", response.status_code)
valid, error_message = verify_response(response)
if not valid:
printf(error_message, url, "/.git/HEAD", file=sys.stderr)
return 1
elif not re.match(r"^(ref:.*|[0-9a-f]{40}$)", response.text.strip()):
printf(
"error: %s/.git/HEAD is not a git HEAD file\n",
url,
file=sys.stderr,
)
return 1
# set up environment to ensure proxy usage
environment = os.environ.copy()
configured_proxy = socks.getdefaultproxy()
if configured_proxy is not None:
proxy_types = ["http", "socks4h", "socks5h"]
environment["ALL_PROXY"] = f"http.proxy={proxy_types[configured_proxy[0]]}://{configured_proxy[1]}:{configured_proxy[2]}"
# check for directory listing
printf("[-] Testing %s/.git/ ", url)
response = session.get("%s/.git/" % url, allow_redirects=False)
printf("[%d]\n", response.status_code)
if (
response.status_code == 200
and is_html(response)
and "HEAD" in get_indexed_files(response)
):
printf("[-] Fetching .git recursively\n")
process_tasks(
[".git/", ".gitignore"],
RecursiveDownloadWorker,
jobs,
args=(url, directory, retry, timeout, http_headers),
)
os.chdir(directory)
printf("[-] Sanitizing .git/config\n")
sanitize_file(".git/config")
printf("[-] Running git checkout .\n")
subprocess.check_call(["git", "checkout", "."], env=environment)
return 0
# no directory listing
printf("[-] Fetching common files\n")
tasks = [
".gitignore",
".git/COMMIT_EDITMSG",
".git/description",
".git/hooks/applypatch-msg.sample",
".git/hooks/commit-msg.sample",
".git/hooks/post-commit.sample",
".git/hooks/post-receive.sample",
".git/hooks/post-update.sample",
".git/hooks/pre-applypatch.sample",
".git/hooks/pre-commit.sample",
".git/hooks/pre-push.sample",
".git/hooks/pre-rebase.sample",
".git/hooks/pre-receive.sample",
".git/hooks/prepare-commit-msg.sample",
".git/hooks/update.sample",
".git/index",
".git/info/exclude",
".git/objects/info/packs",
]
process_tasks(
tasks,
DownloadWorker,
jobs,
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
)
# find refs
printf("[-] Finding refs/\n")
tasks = [
".git/FETCH_HEAD",
".git/HEAD",
".git/ORIG_HEAD",
".git/config",
".git/info/refs",
".git/logs/HEAD",
".git/logs/refs/heads/main",
".git/logs/refs/heads/master",
".git/logs/refs/heads/staging",
".git/logs/refs/heads/production",
".git/logs/refs/heads/development",
".git/logs/refs/remotes/origin/HEAD",
".git/logs/refs/remotes/origin/main",
".git/logs/refs/remotes/origin/master",
".git/logs/refs/remotes/origin/staging",
".git/logs/refs/remotes/origin/production",
".git/logs/refs/remotes/origin/development",
".git/logs/refs/stash",
".git/packed-refs",
".git/refs/heads/main",
".git/refs/heads/master",
".git/refs/heads/staging",
".git/refs/heads/production",
".git/refs/heads/development",
".git/refs/remotes/origin/HEAD",
".git/refs/remotes/origin/main",
".git/refs/remotes/origin/master",
".git/refs/remotes/origin/staging",
".git/refs/remotes/origin/production",
".git/refs/remotes/origin/development",
".git/refs/stash",
".git/refs/wip/wtree/refs/heads/main",
".git/refs/wip/wtree/refs/heads/master",
".git/refs/wip/wtree/refs/heads/staging",
".git/refs/wip/wtree/refs/heads/production",
".git/refs/wip/wtree/refs/heads/development",
".git/refs/wip/index/refs/heads/main",
".git/refs/wip/index/refs/heads/master",
".git/refs/wip/index/refs/heads/staging",
".git/refs/wip/index/refs/heads/production",
".git/refs/wip/index/refs/heads/development"
]
process_tasks(
tasks,
FindRefsWorker,
jobs,
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
)
# find packs
printf("[-] Finding packs\n")
tasks = []
# use .git/objects/info/packs to find packs
info_packs_path = os.path.join(
directory, ".git", "objects", "info", "packs"
)
if os.path.exists(info_packs_path):
with open(info_packs_path, "r") as f:
info_packs = f.read()
for sha1 in re.findall(r"pack-([a-f0-9]{40})\.pack", info_packs):
tasks.append(".git/objects/pack/pack-%s.idx" % sha1)
tasks.append(".git/objects/pack/pack-%s.pack" % sha1)
process_tasks(
tasks,
DownloadWorker,
jobs,
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
)
# find objects
printf("[-] Finding objects\n")
objs = set()
packed_objs = set()
# .git/packed-refs, .git/info/refs, .git/refs/*, .git/logs/*
files = [
os.path.join(directory, ".git", "packed-refs"),
os.path.join(directory, ".git", "info", "refs"),
os.path.join(directory, ".git", "FETCH_HEAD"),
os.path.join(directory, ".git", "ORIG_HEAD"),
]
for dirpath, _, filenames in os.walk(
os.path.join(directory, ".git", "refs")
):
for filename in filenames:
files.append(os.path.join(dirpath, filename))
for dirpath, _, filenames in os.walk(
os.path.join(directory, ".git", "logs")
):
for filename in filenames:
files.append(os.path.join(dirpath, filename))
for filepath in files:
if not os.path.exists(filepath):
continue
with open(filepath, "r") as f:
content = f.read()
for obj in re.findall(r"(^|\s)([a-f0-9]{40})($|\s)", content):
obj = obj[1]
objs.add(obj)
# use .git/index to find objects
index_path = os.path.join(directory, ".git", "index")
if os.path.exists(index_path):
index = dulwich.index.Index(index_path)
for entry in index.iterobjects():
objs.add(entry[1].decode())
# use packs to find more objects to fetch, and objects that are packed
pack_file_dir = os.path.join(directory, ".git", "objects", "pack")
if os.path.isdir(pack_file_dir):
for filename in os.listdir(pack_file_dir):
if filename.startswith("pack-") and filename.endswith(".pack"):
pack_data_path = os.path.join(pack_file_dir, filename)
pack_idx_path = os.path.join(
pack_file_dir, filename[:-5] + ".idx"
)
pack_data = dulwich.pack.PackData(pack_data_path, object_format=dulwich.object_format.DEFAULT_OBJECT_FORMAT)
pack_idx = dulwich.pack.load_pack_index(pack_idx_path, object_format=dulwich.object_format.DEFAULT_OBJECT_FORMAT)
pack = dulwich.pack.Pack.from_objects(pack_data, pack_idx)
for obj_file in pack.iterobjects():
packed_objs.add(obj_file.sha().hexdigest())
objs |= set(get_referenced_sha1(obj_file))
# fetch all objects
printf("[-] Fetching objects\n")
process_tasks(
objs,
FindObjectsWorker,
jobs,
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
tasks_done=packed_objs,
)
# git checkout
printf("[-] Running git checkout .\n")
os.chdir(directory)
sanitize_file(".git/config")
# ignore errors
subprocess.call(
["git", "checkout", "."],
stderr=open(os.devnull, "wb"),
env=environment
)
return 0
def main():
parser = argparse.ArgumentParser(
usage="git-dumper [options] URL DIR",
description="Dump a git repository from a website.",
)
parser.add_argument("url", metavar="URL", help="url")
parser.add_argument("directory", metavar="DIR", help="output directory")
parser.add_argument("--proxy", help="use the specified proxy")
parser.add_argument("--client-cert-p12", help="client certificate in PKCS#12")
parser.add_argument("--client-cert-p12-password", help="password for the client certificate")
parser.add_argument(
"-j",
"--jobs",
type=int,
default=10,
help="number of simultaneous requests",
)
parser.add_argument(
"-r",
"--retry",
type=int,
default=3,
help="number of request attempts before giving up",
)
parser.add_argument(
"-t",
"--timeout",
type=int,
default=3,
help="maximum time in seconds before giving up",
)
parser.add_argument(
"-u",
"--user-agent",
type=str,
default="Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0",
help="user-agent to use for requests",
)
parser.add_argument(
"-H",
"--header",
type=str,
action="append",
help="additional http headers, e.g `NAME=VALUE`",
)
args = parser.parse_args()
# jobs
if args.jobs < 1:
parser.error("invalid number of jobs, got `%d`" % args.jobs)
# retry
if args.retry < 1:
parser.error("invalid number of retries, got `%d`" % args.retry)
# timeout
if args.timeout < 1:
parser.error("invalid timeout, got `%d`" % args.timeout)
# header
http_headers = {"User-Agent": args.user_agent}
if args.header:
for header in args.header:
tokens = header.split("=", maxsplit=1)
if len(tokens) != 2:
parser.error(
"http header must have the form NAME=VALUE, got `%s`"
% header
)
name, value = tokens
http_headers[name.strip()] = value.strip()
# proxy
if args.proxy:
proxy_valid = False
for pattern, proxy_type in [
(r"^socks5:(.*):(\d+)$", socks.PROXY_TYPE_SOCKS5),
(r"^socks4:(.*):(\d+)$", socks.PROXY_TYPE_SOCKS4),
(r"^http://(.*):(\d+)$", socks.PROXY_TYPE_HTTP),
(r"^(.*):(\d+)$", socks.PROXY_TYPE_SOCKS5),
]:
m = re.match(pattern, args.proxy)
if m:
socks.setdefaultproxy(proxy_type, m.group(1), int(m.group(2)))
socket.socket = socks.socksocket
proxy_valid = True
break
if not proxy_valid:
parser.error("invalid proxy, got `%s`" % args.proxy)
# output directory
if not os.path.exists(args.directory):
os.makedirs(args.directory)
if not os.path.isdir(args.directory):
parser.error("`%s` is not a directory" % args.directory)
# client certificate
if args.client_cert_p12:
if not os.path.exists(args.client_cert_p12):
parser.error(
"client certificate `%s` does not exist" % args.client_cert_p12
)
if not os.path.isfile(args.client_cert_p12):
parser.error(
"client certificate `%s` is not a file" % args.client_cert_p12
)
if args.client_cert_p12_password is None:
parser.error("client certificate password is required")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# fetch everything
sys.exit(
fetch_git(
args.url,
args.directory,
args.jobs,
args.retry,
args.timeout,
http_headers,
args.client_cert_p12,
args.client_cert_p12_password
)
)
if __name__ == "__main__":
main()