mirror of
https://github.com/arthaud/git-dumper.git
synced 2026-05-04 11:32:26 +00:00
821 lines
25 KiB
Python
Executable File
821 lines
25 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from contextlib import closing
|
|
import argparse
|
|
import multiprocessing
|
|
import os
|
|
import os.path
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import urllib.parse
|
|
|
|
import urllib3
|
|
|
|
import bs4
|
|
import dulwich.index
|
|
import dulwich.objects
|
|
import dulwich.pack
|
|
import requests
|
|
import socks
|
|
from requests_pkcs12 import Pkcs12Adapter
|
|
|
|
|
|
def printf(fmt, *args, file=sys.stdout):
|
|
if args:
|
|
fmt = fmt % args
|
|
|
|
file.write(fmt)
|
|
file.flush()
|
|
|
|
|
|
def is_html(response):
|
|
""" Return True if the response is a HTML webpage """
|
|
return (
|
|
"Content-Type" in response.headers
|
|
and "text/html" in response.headers["Content-Type"]
|
|
)
|
|
|
|
|
|
def is_safe_path(path):
|
|
""" Prevent directory traversal attacks """
|
|
if path.startswith("/"):
|
|
return False
|
|
|
|
safe_path = os.path.expanduser("~")
|
|
return (
|
|
os.path.commonpath(
|
|
(os.path.realpath(os.path.join(safe_path, path)), safe_path)
|
|
)
|
|
== safe_path
|
|
)
|
|
|
|
|
|
def get_indexed_files(response):
|
|
""" Return all the files in the directory index webpage """
|
|
html = bs4.BeautifulSoup(response.text, "html.parser")
|
|
files = []
|
|
|
|
for link in html.find_all("a"):
|
|
url = urllib.parse.urlparse(link.get("href"))
|
|
|
|
if (
|
|
url.path
|
|
and is_safe_path(url.path)
|
|
and not url.scheme
|
|
and not url.netloc
|
|
):
|
|
files.append(url.path)
|
|
|
|
return files
|
|
|
|
|
|
def verify_response(response):
|
|
if response.status_code != 200:
|
|
return (
|
|
False,
|
|
"[-] %s/%s responded with status code {code}\n".format(
|
|
code=response.status_code
|
|
),
|
|
)
|
|
elif (
|
|
"Content-Length" in response.headers
|
|
and response.headers["Content-Length"] == 0
|
|
):
|
|
return False, "[-] %s/%s responded with a zero-length body\n"
|
|
elif (
|
|
"Content-Type" in response.headers
|
|
and "text/html" in response.headers["Content-Type"]
|
|
):
|
|
return False, "[-] %s/%s responded with HTML\n"
|
|
else:
|
|
return True, True
|
|
|
|
|
|
def create_intermediate_dirs(path):
|
|
""" Create intermediate directories, if necessary """
|
|
|
|
dirname, basename = os.path.split(path)
|
|
|
|
if dirname and not os.path.exists(dirname):
|
|
try:
|
|
os.makedirs(dirname)
|
|
except FileExistsError:
|
|
pass # race condition
|
|
|
|
|
|
def get_referenced_sha1(obj_file):
|
|
""" Return all the referenced SHA1 in the given object file """
|
|
objs = []
|
|
|
|
if isinstance(obj_file, dulwich.objects.Commit):
|
|
objs.append(obj_file.tree.decode())
|
|
|
|
for parent in obj_file.parents:
|
|
objs.append(parent.decode())
|
|
elif isinstance(obj_file, dulwich.objects.Tree):
|
|
for item in obj_file.iteritems():
|
|
objs.append(item.sha.decode())
|
|
elif isinstance(obj_file, dulwich.objects.Blob):
|
|
pass
|
|
elif isinstance(obj_file, dulwich.objects.Tag):
|
|
pass
|
|
else:
|
|
printf(
|
|
"error: unexpected object type: %r\n" % obj_file, file=sys.stderr
|
|
)
|
|
sys.exit(1)
|
|
|
|
return objs
|
|
|
|
|
|
class Worker(multiprocessing.Process):
|
|
""" Worker for process_tasks """
|
|
|
|
def __init__(self, pending_tasks, tasks_done, args):
|
|
super().__init__()
|
|
self.daemon = True
|
|
self.pending_tasks = pending_tasks
|
|
self.tasks_done = tasks_done
|
|
self.args = args
|
|
|
|
def run(self):
|
|
# initialize process
|
|
self.init(*self.args)
|
|
|
|
# fetch and do tasks
|
|
while True:
|
|
task = self.pending_tasks.get(block=True)
|
|
|
|
if task is None: # end signal
|
|
return
|
|
|
|
try:
|
|
result = self.do_task(task, *self.args)
|
|
except Exception:
|
|
printf("Task %s raised exception:\n", task, file=sys.stderr)
|
|
traceback.print_exc()
|
|
result = []
|
|
|
|
assert isinstance(
|
|
result, list
|
|
), "do_task() should return a list of tasks"
|
|
|
|
self.tasks_done.put(result)
|
|
|
|
def init(self, *args):
|
|
raise NotImplementedError
|
|
|
|
def do_task(self, task, *args):
|
|
raise NotImplementedError
|
|
|
|
|
|
def process_tasks(initial_tasks, worker, jobs, args=(), tasks_done=None):
|
|
""" Process tasks in parallel """
|
|
|
|
if not initial_tasks:
|
|
return
|
|
|
|
tasks_seen = set(tasks_done) if tasks_done else set()
|
|
pending_tasks = multiprocessing.Queue()
|
|
tasks_done = multiprocessing.Queue()
|
|
num_pending_tasks = 0
|
|
|
|
# add all initial tasks in the queue
|
|
for task in initial_tasks:
|
|
assert task is not None
|
|
|
|
if task not in tasks_seen:
|
|
pending_tasks.put(task)
|
|
num_pending_tasks += 1
|
|
tasks_seen.add(task)
|
|
|
|
# initialize processes
|
|
processes = [worker(pending_tasks, tasks_done, args) for _ in range(jobs)]
|
|
|
|
# launch them all
|
|
for p in processes:
|
|
p.start()
|
|
|
|
# collect task results
|
|
while num_pending_tasks > 0:
|
|
task_result = tasks_done.get(block=True)
|
|
num_pending_tasks -= 1
|
|
|
|
for task in task_result:
|
|
assert task is not None
|
|
|
|
if task not in tasks_seen:
|
|
pending_tasks.put(task)
|
|
num_pending_tasks += 1
|
|
tasks_seen.add(task)
|
|
|
|
# send termination signal (task=None)
|
|
for _ in range(jobs):
|
|
pending_tasks.put(None)
|
|
|
|
# join all
|
|
for p in processes:
|
|
p.join()
|
|
|
|
|
|
class DownloadWorker(Worker):
|
|
""" Download a list of files """
|
|
|
|
def init(self, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
|
|
self.session = requests.Session()
|
|
self.session.verify = False
|
|
self.session.headers = http_headers
|
|
if client_cert_p12:
|
|
self.session.mount(url, Pkcs12Adapter(pkcs12_filename=client_cert_p12, pkcs12_password=client_cert_p12_password))
|
|
else:
|
|
self.session.mount(url, requests.adapters.HTTPAdapter(max_retries=retry))
|
|
|
|
def do_task(self, filepath, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
|
|
if os.path.isfile(os.path.join(directory, filepath)):
|
|
printf("[-] Already downloaded %s/%s\n", url, filepath)
|
|
return []
|
|
|
|
with closing(
|
|
self.session.get(
|
|
"%s/%s" % (url, filepath),
|
|
allow_redirects=False,
|
|
stream=True,
|
|
timeout=timeout,
|
|
)
|
|
) as response:
|
|
printf(
|
|
"[-] Fetching %s/%s [%d]\n",
|
|
url,
|
|
filepath,
|
|
response.status_code,
|
|
)
|
|
|
|
valid, error_message = verify_response(response)
|
|
if not valid:
|
|
printf(error_message, url, filepath, file=sys.stderr)
|
|
return []
|
|
|
|
abspath = os.path.abspath(os.path.join(directory, filepath))
|
|
create_intermediate_dirs(abspath)
|
|
|
|
# write file
|
|
with open(abspath, "wb") as f:
|
|
for chunk in response.iter_content(4096):
|
|
f.write(chunk)
|
|
|
|
return []
|
|
|
|
|
|
class RecursiveDownloadWorker(DownloadWorker):
|
|
""" Download a directory recursively """
|
|
|
|
def do_task(self, filepath, url, directory, retry, timeout, http_headers):
|
|
if os.path.isfile(os.path.join(directory, filepath)):
|
|
printf("[-] Already downloaded %s/%s\n", url, filepath)
|
|
return []
|
|
|
|
with closing(
|
|
self.session.get(
|
|
"%s/%s" % (url, filepath),
|
|
allow_redirects=False,
|
|
stream=True,
|
|
timeout=timeout,
|
|
)
|
|
) as response:
|
|
printf(
|
|
"[-] Fetching %s/%s [%d]\n",
|
|
url,
|
|
filepath,
|
|
response.status_code,
|
|
)
|
|
|
|
if (
|
|
response.status_code in (301, 302)
|
|
and "Location" in response.headers
|
|
and response.headers["Location"].endswith(filepath + "/")
|
|
):
|
|
return [filepath + "/"]
|
|
|
|
if filepath.endswith("/"): # directory index
|
|
assert is_html(response)
|
|
|
|
return [
|
|
filepath + filename
|
|
for filename in get_indexed_files(response)
|
|
]
|
|
else: # file
|
|
valid, error_message = verify_response(response)
|
|
if not valid:
|
|
printf(error_message, url, filepath, file=sys.stderr)
|
|
return []
|
|
|
|
abspath = os.path.abspath(os.path.join(directory, filepath))
|
|
create_intermediate_dirs(abspath)
|
|
|
|
# write file
|
|
with open(abspath, "wb") as f:
|
|
for chunk in response.iter_content(4096):
|
|
f.write(chunk)
|
|
|
|
return []
|
|
|
|
|
|
class FindRefsWorker(DownloadWorker):
|
|
""" Find refs/ """
|
|
|
|
def do_task(self, filepath, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
|
|
response = self.session.get(
|
|
"%s/%s" % (url, filepath), allow_redirects=False, timeout=timeout
|
|
)
|
|
printf(
|
|
"[-] Fetching %s/%s [%d]\n", url, filepath, response.status_code
|
|
)
|
|
|
|
valid, error_message = verify_response(response)
|
|
if not valid:
|
|
printf(error_message, url, filepath, file=sys.stderr)
|
|
return []
|
|
|
|
abspath = os.path.abspath(os.path.join(directory, filepath))
|
|
create_intermediate_dirs(abspath)
|
|
|
|
# write file
|
|
with open(abspath, "w") as f:
|
|
f.write(response.text)
|
|
|
|
# find refs
|
|
tasks = []
|
|
|
|
for ref in re.findall(
|
|
r"(refs(/[a-zA-Z0-9\-\.\_\*]+)+)", response.text
|
|
):
|
|
ref = ref[0]
|
|
if not ref.endswith("*") and is_safe_path(ref):
|
|
tasks.append(".git/%s" % ref)
|
|
tasks.append(".git/logs/%s" % ref)
|
|
|
|
return tasks
|
|
|
|
|
|
class FindObjectsWorker(DownloadWorker):
|
|
""" Find objects """
|
|
|
|
def do_task(self, obj, url, directory, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
|
|
filepath = ".git/objects/%s/%s" % (obj[:2], obj[2:])
|
|
|
|
if os.path.isfile(os.path.join(directory, filepath)):
|
|
printf("[-] Already downloaded %s/%s\n", url, filepath)
|
|
else:
|
|
response = self.session.get(
|
|
"%s/%s" % (url, filepath),
|
|
allow_redirects=False,
|
|
timeout=timeout,
|
|
)
|
|
printf(
|
|
"[-] Fetching %s/%s [%d]\n",
|
|
url,
|
|
filepath,
|
|
response.status_code,
|
|
)
|
|
|
|
valid, error_message = verify_response(response)
|
|
if not valid:
|
|
printf(error_message, url, filepath, file=sys.stderr)
|
|
return []
|
|
|
|
abspath = os.path.abspath(os.path.join(directory, filepath))
|
|
create_intermediate_dirs(abspath)
|
|
|
|
# write file
|
|
with open(abspath, "wb") as f:
|
|
f.write(response.content)
|
|
|
|
abspath = os.path.abspath(os.path.join(directory, filepath))
|
|
# parse object file to find other objects
|
|
obj_file = dulwich.objects.ShaFile.from_path(abspath)
|
|
return get_referenced_sha1(obj_file)
|
|
|
|
|
|
def sanitize_file(filepath):
|
|
""" Inplace comment out possibly unsafe lines based on regex """
|
|
assert os.path.isfile(filepath), "%s is not a file" % filepath
|
|
|
|
UNSAFE=r"^\s*fsmonitor|sshcommand|askpass|editor|pager"
|
|
|
|
with open(filepath, 'r+') as f:
|
|
content = f.read()
|
|
modified_content = re.sub(UNSAFE, r'# \g<0>', content, flags=re.IGNORECASE)
|
|
if content != modified_content:
|
|
printf("Warning: '%s' file was altered\n" % filepath)
|
|
f.seek(0)
|
|
f.write(modified_content)
|
|
|
|
|
|
def fetch_git(url, directory, jobs, retry, timeout, http_headers, client_cert_p12=None, client_cert_p12_password=None):
|
|
""" Dump a git repository into the output directory """
|
|
|
|
assert os.path.isdir(directory), "%s is not a directory" % directory
|
|
assert jobs >= 1, "invalid number of jobs"
|
|
assert retry >= 1, "invalid number of retries"
|
|
assert timeout >= 1, "invalid timeout"
|
|
|
|
session = requests.Session()
|
|
session.verify = False
|
|
session.headers = http_headers
|
|
if client_cert_p12:
|
|
session.mount(url, Pkcs12Adapter(pkcs12_filename=client_cert_p12, pkcs12_password=client_cert_p12_password))
|
|
else:
|
|
session.mount(url, requests.adapters.HTTPAdapter(max_retries=retry))
|
|
if os.listdir(directory):
|
|
printf("Warning: Destination '%s' is not empty\n", directory)
|
|
|
|
# find base url
|
|
url = url.rstrip("/")
|
|
if url.endswith("HEAD"):
|
|
url = url[:-4]
|
|
url = url.rstrip("/")
|
|
if url.endswith(".git"):
|
|
url = url[:-4]
|
|
url = url.rstrip("/")
|
|
|
|
# check for /.git/HEAD
|
|
printf("[-] Testing %s/.git/HEAD ", url)
|
|
response = session.get(
|
|
"%s/.git/HEAD" % url,
|
|
timeout=timeout,
|
|
allow_redirects=False
|
|
)
|
|
printf("[%d]\n", response.status_code)
|
|
|
|
valid, error_message = verify_response(response)
|
|
if not valid:
|
|
printf(error_message, url, "/.git/HEAD", file=sys.stderr)
|
|
return 1
|
|
elif not re.match(r"^(ref:.*|[0-9a-f]{40}$)", response.text.strip()):
|
|
printf(
|
|
"error: %s/.git/HEAD is not a git HEAD file\n",
|
|
url,
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
# set up environment to ensure proxy usage
|
|
environment = os.environ.copy()
|
|
configured_proxy = socks.getdefaultproxy()
|
|
if configured_proxy is not None:
|
|
proxy_types = ["http", "socks4h", "socks5h"]
|
|
environment["ALL_PROXY"] = f"http.proxy={proxy_types[configured_proxy[0]]}://{configured_proxy[1]}:{configured_proxy[2]}"
|
|
|
|
# check for directory listing
|
|
printf("[-] Testing %s/.git/ ", url)
|
|
response = session.get("%s/.git/" % url, allow_redirects=False)
|
|
printf("[%d]\n", response.status_code)
|
|
|
|
|
|
if (
|
|
response.status_code == 200
|
|
and is_html(response)
|
|
and "HEAD" in get_indexed_files(response)
|
|
):
|
|
printf("[-] Fetching .git recursively\n")
|
|
process_tasks(
|
|
[".git/", ".gitignore"],
|
|
RecursiveDownloadWorker,
|
|
jobs,
|
|
args=(url, directory, retry, timeout, http_headers),
|
|
)
|
|
|
|
os.chdir(directory)
|
|
|
|
printf("[-] Sanitizing .git/config\n")
|
|
sanitize_file(".git/config")
|
|
|
|
printf("[-] Running git checkout .\n")
|
|
subprocess.check_call(["git", "checkout", "."], env=environment)
|
|
return 0
|
|
|
|
# no directory listing
|
|
printf("[-] Fetching common files\n")
|
|
tasks = [
|
|
".gitignore",
|
|
".git/COMMIT_EDITMSG",
|
|
".git/description",
|
|
".git/hooks/applypatch-msg.sample",
|
|
".git/hooks/commit-msg.sample",
|
|
".git/hooks/post-commit.sample",
|
|
".git/hooks/post-receive.sample",
|
|
".git/hooks/post-update.sample",
|
|
".git/hooks/pre-applypatch.sample",
|
|
".git/hooks/pre-commit.sample",
|
|
".git/hooks/pre-push.sample",
|
|
".git/hooks/pre-rebase.sample",
|
|
".git/hooks/pre-receive.sample",
|
|
".git/hooks/prepare-commit-msg.sample",
|
|
".git/hooks/update.sample",
|
|
".git/index",
|
|
".git/info/exclude",
|
|
".git/objects/info/packs",
|
|
]
|
|
process_tasks(
|
|
tasks,
|
|
DownloadWorker,
|
|
jobs,
|
|
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
|
|
)
|
|
|
|
# find refs
|
|
printf("[-] Finding refs/\n")
|
|
tasks = [
|
|
".git/FETCH_HEAD",
|
|
".git/HEAD",
|
|
".git/ORIG_HEAD",
|
|
".git/config",
|
|
".git/info/refs",
|
|
".git/logs/HEAD",
|
|
".git/logs/refs/heads/main",
|
|
".git/logs/refs/heads/master",
|
|
".git/logs/refs/heads/staging",
|
|
".git/logs/refs/heads/production",
|
|
".git/logs/refs/heads/development",
|
|
".git/logs/refs/remotes/origin/HEAD",
|
|
".git/logs/refs/remotes/origin/main",
|
|
".git/logs/refs/remotes/origin/master",
|
|
".git/logs/refs/remotes/origin/staging",
|
|
".git/logs/refs/remotes/origin/production",
|
|
".git/logs/refs/remotes/origin/development",
|
|
".git/logs/refs/stash",
|
|
".git/packed-refs",
|
|
".git/refs/heads/main",
|
|
".git/refs/heads/master",
|
|
".git/refs/heads/staging",
|
|
".git/refs/heads/production",
|
|
".git/refs/heads/development",
|
|
".git/refs/remotes/origin/HEAD",
|
|
".git/refs/remotes/origin/main",
|
|
".git/refs/remotes/origin/master",
|
|
".git/refs/remotes/origin/staging",
|
|
".git/refs/remotes/origin/production",
|
|
".git/refs/remotes/origin/development",
|
|
".git/refs/stash",
|
|
".git/refs/wip/wtree/refs/heads/main",
|
|
".git/refs/wip/wtree/refs/heads/master",
|
|
".git/refs/wip/wtree/refs/heads/staging",
|
|
".git/refs/wip/wtree/refs/heads/production",
|
|
".git/refs/wip/wtree/refs/heads/development",
|
|
".git/refs/wip/index/refs/heads/main",
|
|
".git/refs/wip/index/refs/heads/master",
|
|
".git/refs/wip/index/refs/heads/staging",
|
|
".git/refs/wip/index/refs/heads/production",
|
|
".git/refs/wip/index/refs/heads/development"
|
|
]
|
|
|
|
process_tasks(
|
|
tasks,
|
|
FindRefsWorker,
|
|
jobs,
|
|
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
|
|
)
|
|
|
|
# find packs
|
|
printf("[-] Finding packs\n")
|
|
tasks = []
|
|
|
|
# use .git/objects/info/packs to find packs
|
|
info_packs_path = os.path.join(
|
|
directory, ".git", "objects", "info", "packs"
|
|
)
|
|
if os.path.exists(info_packs_path):
|
|
with open(info_packs_path, "r") as f:
|
|
info_packs = f.read()
|
|
|
|
for sha1 in re.findall(r"pack-([a-f0-9]{40})\.pack", info_packs):
|
|
tasks.append(".git/objects/pack/pack-%s.idx" % sha1)
|
|
tasks.append(".git/objects/pack/pack-%s.pack" % sha1)
|
|
|
|
process_tasks(
|
|
tasks,
|
|
DownloadWorker,
|
|
jobs,
|
|
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
|
|
)
|
|
|
|
# find objects
|
|
printf("[-] Finding objects\n")
|
|
objs = set()
|
|
packed_objs = set()
|
|
|
|
# .git/packed-refs, .git/info/refs, .git/refs/*, .git/logs/*
|
|
files = [
|
|
os.path.join(directory, ".git", "packed-refs"),
|
|
os.path.join(directory, ".git", "info", "refs"),
|
|
os.path.join(directory, ".git", "FETCH_HEAD"),
|
|
os.path.join(directory, ".git", "ORIG_HEAD"),
|
|
]
|
|
for dirpath, _, filenames in os.walk(
|
|
os.path.join(directory, ".git", "refs")
|
|
):
|
|
for filename in filenames:
|
|
files.append(os.path.join(dirpath, filename))
|
|
for dirpath, _, filenames in os.walk(
|
|
os.path.join(directory, ".git", "logs")
|
|
):
|
|
for filename in filenames:
|
|
files.append(os.path.join(dirpath, filename))
|
|
|
|
for filepath in files:
|
|
if not os.path.exists(filepath):
|
|
continue
|
|
|
|
with open(filepath, "r") as f:
|
|
content = f.read()
|
|
|
|
for obj in re.findall(r"(^|\s)([a-f0-9]{40})($|\s)", content):
|
|
obj = obj[1]
|
|
objs.add(obj)
|
|
|
|
# use .git/index to find objects
|
|
index_path = os.path.join(directory, ".git", "index")
|
|
if os.path.exists(index_path):
|
|
index = dulwich.index.Index(index_path)
|
|
|
|
for entry in index.iterobjects():
|
|
objs.add(entry[1].decode())
|
|
|
|
# use packs to find more objects to fetch, and objects that are packed
|
|
pack_file_dir = os.path.join(directory, ".git", "objects", "pack")
|
|
if os.path.isdir(pack_file_dir):
|
|
for filename in os.listdir(pack_file_dir):
|
|
if filename.startswith("pack-") and filename.endswith(".pack"):
|
|
pack_data_path = os.path.join(pack_file_dir, filename)
|
|
pack_idx_path = os.path.join(
|
|
pack_file_dir, filename[:-5] + ".idx"
|
|
)
|
|
pack_data = dulwich.pack.PackData(pack_data_path, object_format=dulwich.object_format.DEFAULT_OBJECT_FORMAT)
|
|
pack_idx = dulwich.pack.load_pack_index(pack_idx_path, object_format=dulwich.object_format.DEFAULT_OBJECT_FORMAT)
|
|
pack = dulwich.pack.Pack.from_objects(pack_data, pack_idx)
|
|
|
|
for obj_file in pack.iterobjects():
|
|
packed_objs.add(obj_file.sha().hexdigest())
|
|
objs |= set(get_referenced_sha1(obj_file))
|
|
|
|
# fetch all objects
|
|
printf("[-] Fetching objects\n")
|
|
process_tasks(
|
|
objs,
|
|
FindObjectsWorker,
|
|
jobs,
|
|
args=(url, directory, retry, timeout, http_headers, client_cert_p12, client_cert_p12_password),
|
|
tasks_done=packed_objs,
|
|
)
|
|
|
|
# git checkout
|
|
printf("[-] Running git checkout .\n")
|
|
os.chdir(directory)
|
|
sanitize_file(".git/config")
|
|
|
|
# ignore errors
|
|
subprocess.call(
|
|
["git", "checkout", "."],
|
|
stderr=open(os.devnull, "wb"),
|
|
env=environment
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
usage="git-dumper [options] URL DIR",
|
|
description="Dump a git repository from a website.",
|
|
)
|
|
parser.add_argument("url", metavar="URL", help="url")
|
|
parser.add_argument("directory", metavar="DIR", help="output directory")
|
|
parser.add_argument("--proxy", help="use the specified proxy")
|
|
parser.add_argument("--client-cert-p12", help="client certificate in PKCS#12")
|
|
parser.add_argument("--client-cert-p12-password", help="password for the client certificate")
|
|
parser.add_argument(
|
|
"-j",
|
|
"--jobs",
|
|
type=int,
|
|
default=10,
|
|
help="number of simultaneous requests",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--retry",
|
|
type=int,
|
|
default=3,
|
|
help="number of request attempts before giving up",
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--timeout",
|
|
type=int,
|
|
default=3,
|
|
help="maximum time in seconds before giving up",
|
|
)
|
|
parser.add_argument(
|
|
"-u",
|
|
"--user-agent",
|
|
type=str,
|
|
default="Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0",
|
|
help="user-agent to use for requests",
|
|
)
|
|
parser.add_argument(
|
|
"-H",
|
|
"--header",
|
|
type=str,
|
|
action="append",
|
|
help="additional http headers, e.g `NAME=VALUE`",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# jobs
|
|
if args.jobs < 1:
|
|
parser.error("invalid number of jobs, got `%d`" % args.jobs)
|
|
|
|
# retry
|
|
if args.retry < 1:
|
|
parser.error("invalid number of retries, got `%d`" % args.retry)
|
|
|
|
# timeout
|
|
if args.timeout < 1:
|
|
parser.error("invalid timeout, got `%d`" % args.timeout)
|
|
|
|
# header
|
|
http_headers = {"User-Agent": args.user_agent}
|
|
if args.header:
|
|
for header in args.header:
|
|
tokens = header.split("=", maxsplit=1)
|
|
if len(tokens) != 2:
|
|
parser.error(
|
|
"http header must have the form NAME=VALUE, got `%s`"
|
|
% header
|
|
)
|
|
name, value = tokens
|
|
http_headers[name.strip()] = value.strip()
|
|
|
|
# proxy
|
|
if args.proxy:
|
|
proxy_valid = False
|
|
|
|
for pattern, proxy_type in [
|
|
(r"^socks5:(.*):(\d+)$", socks.PROXY_TYPE_SOCKS5),
|
|
(r"^socks4:(.*):(\d+)$", socks.PROXY_TYPE_SOCKS4),
|
|
(r"^http://(.*):(\d+)$", socks.PROXY_TYPE_HTTP),
|
|
(r"^(.*):(\d+)$", socks.PROXY_TYPE_SOCKS5),
|
|
]:
|
|
m = re.match(pattern, args.proxy)
|
|
if m:
|
|
socks.setdefaultproxy(proxy_type, m.group(1), int(m.group(2)))
|
|
socket.socket = socks.socksocket
|
|
proxy_valid = True
|
|
break
|
|
|
|
if not proxy_valid:
|
|
parser.error("invalid proxy, got `%s`" % args.proxy)
|
|
|
|
# output directory
|
|
if not os.path.exists(args.directory):
|
|
os.makedirs(args.directory)
|
|
|
|
if not os.path.isdir(args.directory):
|
|
parser.error("`%s` is not a directory" % args.directory)
|
|
|
|
# client certificate
|
|
if args.client_cert_p12:
|
|
if not os.path.exists(args.client_cert_p12):
|
|
parser.error(
|
|
"client certificate `%s` does not exist" % args.client_cert_p12
|
|
)
|
|
|
|
if not os.path.isfile(args.client_cert_p12):
|
|
parser.error(
|
|
"client certificate `%s` is not a file" % args.client_cert_p12
|
|
)
|
|
|
|
if args.client_cert_p12_password is None:
|
|
parser.error("client certificate password is required")
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# fetch everything
|
|
sys.exit(
|
|
fetch_git(
|
|
args.url,
|
|
args.directory,
|
|
args.jobs,
|
|
args.retry,
|
|
args.timeout,
|
|
http_headers,
|
|
args.client_cert_p12,
|
|
args.client_cert_p12_password
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|