Compare commits

...

1 Commits

Author SHA1 Message Date
panni fb8bfeb044 untested WIP. 2020-06-06 04:24:03 +02:00
17 changed files with 15647 additions and 574 deletions
+517 -153
View File
@@ -2,6 +2,20 @@ import logging
import re
import sys
import ssl
import requests
try:
import copyreg
except ImportError:
import copy_reg as copyreg
try:
from HTMLParser import HTMLParser
except ImportError:
if sys.version_info >= (3, 4):
import html
else:
from html.parser import HTMLParser
from copy import deepcopy
from time import sleep
@@ -9,9 +23,17 @@ from collections import OrderedDict
from requests.sessions import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from .exceptions import (
CloudflareLoopProtection,
CloudflareCode1020,
CloudflareIUAMError,
CloudflareReCaptchaError,
CloudflareReCaptchaProvider
)
from .interpreters import JavaScriptInterpreter
from .reCaptcha import reCaptcha
from .user_agent import User_Agent
try:
@@ -25,219 +47,540 @@ except ImportError:
pass
try:
from urlparse import urlparse
from urlparse import urlunparse
from urlparse import urlparse, urljoin
except ImportError:
from urllib.parse import urlparse
from urllib.parse import urlunparse
from urllib.parse import urlparse, urljoin
##########################################################################################################################################################
__version__ = '1.1.9'
# ------------------------------------------------------------------------------- #
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
__version__ = '1.2.31'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class CipherSuiteAdapter(HTTPAdapter):
def __init__(self, cipherSuite=None, **kwargs):
self.cipherSuite = cipherSuite
__attrs__ = [
'ssl_context',
'max_retries',
'config',
'_pool_connections',
'_pool_maxsize',
'_pool_block'
]
if hasattr(ssl, 'PROTOCOL_TLS'):
self.ssl_context = create_urllib3_context(
ssl_version=getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2),
ciphers=self.cipherSuite
)
else:
self.ssl_context = create_urllib3_context(ssl_version=ssl.PROTOCOL_TLSv1)
def __init__(self, *args, **kwargs):
self.ssl_context = kwargs.pop('ssl_context', None)
self.cipherSuite = kwargs.pop('cipherSuite', None)
if not self.ssl_context:
self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
self.ssl_context.set_ciphers(self.cipherSuite)
self.ssl_context.options |= (ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1)
super(CipherSuiteAdapter, self).__init__(**kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).init_poolmanager(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).proxy_manager_for(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class CloudScraper(Session):
def __init__(self, *args, **kwargs):
self.debug = kwargs.pop('debug', False)
self.delay = kwargs.pop('delay', None)
self.interpreter = kwargs.pop('interpreter', 'js2py')
self.allow_brotli = kwargs.pop('allow_brotli', True if 'brotli' in sys.modules.keys() else False)
self.cipherSuite = None
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.interpreter = kwargs.pop('interpreter', 'native')
self.recaptcha = kwargs.pop('recaptcha', {})
self.allow_brotli = kwargs.pop(
'allow_brotli',
True if 'brotli' in sys.modules.keys() else False
)
self.user_agent = User_Agent(
allow_brotli=self.allow_brotli,
browser=kwargs.pop('browser', None)
)
self._solveDepthCnt = 0
self.solveDepth = kwargs.pop('solveDepth', 3)
super(CloudScraper, self).__init__(*args, **kwargs)
# pylint: disable=E0203
if 'requests' in self.headers['User-Agent']:
# ------------------------------------------------------------------------------- #
# Set a random User-Agent if no custom User-Agent has been set
self.headers = User_Agent(allow_brotli=self.allow_brotli).headers
# ------------------------------------------------------------------------------- #
self.headers = self.user_agent.headers
if not self.cipherSuite:
self.cipherSuite = self.user_agent.cipherSuite
self.mount('https://', CipherSuiteAdapter(self.loadCipherSuite()))
if isinstance(self.cipherSuite, list):
self.cipherSuite = ':'.join(self.cipherSuite)
##########################################################################################################################################################
self.mount(
'https://',
CipherSuiteAdapter(
cipherSuite=self.cipherSuite
)
)
# purely to allow us to pickle dump
copyreg.pickle(ssl.SSLContext, lambda obj: (obj.__class__, (obj.protocol,)))
# ------------------------------------------------------------------------------- #
# Allow us to pickle our session back with all variables
# ------------------------------------------------------------------------------- #
def __getstate__(self):
return self.__dict__
# ------------------------------------------------------------------------------- #
# Raise an Exception with no stacktrace and reset depth counter.
# ------------------------------------------------------------------------------- #
def simpleException(self, exception, msg):
self._solveDepthCnt = 0
sys.tracebacklimit = 0
raise exception(msg)
# ------------------------------------------------------------------------------- #
# debug the request via the response
# ------------------------------------------------------------------------------- #
@staticmethod
def debugRequest(req):
try:
print(dump.dump_all(req).decode('utf-8'))
except: # noqa
pass
except ValueError as e:
print("Debug Error: {}".format(getattr(e, 'message', e)))
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Unescape / decode html entities
# ------------------------------------------------------------------------------- #
def loadCipherSuite(self):
if self.cipherSuite:
return self.cipherSuite
@staticmethod
def unescape(html_text):
if sys.version_info >= (3, 0):
if sys.version_info >= (3, 4):
return html.unescape(html_text)
self.cipherSuite = ''
return HTMLParser().unescape(html_text)
if hasattr(ssl, 'PROTOCOL_TLS'):
ciphers = [
'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-CHACHA20-POLY1305-SHA256', 'ECDHE-RSA-CHACHA20-POLY1305-SHA256',
'ECDHE-RSA-AES128-CBC-SHA', 'ECDHE-RSA-AES256-CBC-SHA', 'RSA-AES128-GCM-SHA256', 'RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-GCM-SHA256', 'RSA-AES256-SHA', '3DES-EDE-CBC'
]
return HTMLParser().unescape(html_text)
if hasattr(ssl, 'PROTOCOL_TLSv1_3'):
ciphers.insert(0, ['GREASE_3A', 'GREASE_6A', 'AES128-GCM-SHA256', 'AES256-GCM-SHA256', 'AES256-GCM-SHA384', 'CHACHA20-POLY1305-SHA256'])
# ------------------------------------------------------------------------------- #
# Decode Brotli on older versions of urllib3 manually
# ------------------------------------------------------------------------------- #
ctx = ssl.SSLContext(getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2))
for cipher in ciphers:
try:
ctx.set_ciphers(cipher)
self.cipherSuite = '{}:{}'.format(self.cipherSuite, cipher).rstrip(':')
except ssl.SSLError:
pass
return self.cipherSuite
##########################################################################################################################################################
def request(self, method, url, *args, **kwargs):
ourSuper = super(CloudScraper, self)
resp = ourSuper.request(method, url, *args, **kwargs)
if resp.headers.get('Content-Encoding') == 'br':
def decodeBrotli(self, resp):
if requests.packages.urllib3.__version__ < '1.25.1' and resp.headers.get('Content-Encoding') == 'br':
if self.allow_brotli and resp._content:
resp._content = brotli.decompress(resp.content)
else:
logging.warning('Brotli content detected, But option is disabled, we will not continue.')
return resp
logging.warning(
'You\'re running urllib3 {}, Brotli content detected, '
'Which requires manual decompression, '
'But option allow_brotli is set to False, '
'We will not continue to decompress.'.format(requests.packages.urllib3.__version__)
)
return resp
# ------------------------------------------------------------------------------- #
# Our hijacker request function
# ------------------------------------------------------------------------------- #
def request(self, method, url, *args, **kwargs):
# pylint: disable=E0203
if kwargs.get('proxies') and kwargs.get('proxies') != self.proxies:
self.proxies = kwargs.get('proxies')
resp = self.decodeBrotli(
super(CloudScraper, self).request(method, url, *args, **kwargs)
)
# ------------------------------------------------------------------------------- #
# Debug request
# ------------------------------------------------------------------------------- #
if self.debug:
self.debugRequest(resp)
# Check if Cloudflare anti-bot is on
if self.isChallengeRequest(resp):
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.
self.request('GET', resp.url)
resp = ourSuper.request(method, url, *args, **kwargs)
else:
# Solve Challenge
resp = self.sendChallengeResponse(resp, **kwargs)
if self.is_Challenge_Request(resp):
# ------------------------------------------------------------------------------- #
# Try to solve the challenge and send it back
# ------------------------------------------------------------------------------- #
if self._solveDepthCnt >= self.solveDepth:
_ = self._solveDepthCnt
self.simpleException(
CloudflareLoopProtection,
"!!Loop Protection!! We have tried to solve {} time(s) in a row.".format(_)
)
self._solveDepthCnt += 1
resp = self.Challenge_Response(resp, **kwargs)
else:
if not resp.is_redirect and resp.status_code not in [429, 503]:
self._solveDepthCnt = 0
return resp
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def isChallengeRequest(resp):
if resp.headers.get('Server', '').startswith('cloudflare'):
if b'why_captcha' in resp.content or b'/cdn-cgi/l/chk_captcha' in resp.content:
raise ValueError('Captcha')
def is_IUAM_Challenge(resp):
try:
return (
resp.status_code in [429, 503]
and all(s in resp.content for s in [b'jschl_vc', b'jschl_answer'])
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'action="/.*?__cf_chl_jschl_tk__=\S+".*?name="jschl_vc"\svalue=.*?',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
##########################################################################################################################################################
def sendChallengeResponse(self, resp, **original_kwargs):
body = resp.text
# Cloudflare requires a delay before solving the challenge
if not self.delay:
try:
delay = float(re.search(r'submit\(\);\r?\n\s*},\s*([0-9]+)', body).group(1)) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except: # noqa
pass
sleep(self.delay)
parsed_url = urlparse(resp.url)
domain = parsed_url.netloc
submit_url = '{}://{}/cdn-cgi/l/chk_jschl'.format(parsed_url.scheme, domain)
cloudflare_kwargs = deepcopy(original_kwargs)
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare reCaptcha challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_reCaptcha_Challenge(resp):
try:
params = OrderedDict()
s = re.search(r'name="s"\svalue="(?P<s_value>[^"]+)', body)
if s:
params['s'] = s.group('s_value')
params.update(
[
('jschl_vc', re.search(r'name="jschl_vc" value="(\w+)"', body).group(1)),
('pass', re.search(r'name="pass" value="(.+?)"', body).group(1))
]
)
params = cloudflare_kwargs.setdefault('params', params)
except Exception as e:
raise ValueError('Unable to parse Cloudflare anti-bots page: {} {}'.format(e.message, BUG_REPORT))
# Solve the Javascript challenge
params['jschl_answer'] = JavaScriptInterpreter.dynamicImport(self.interpreter).solveChallenge(body, domain)
# Requests transforms any request into a GET after a redirect,
# so the redirect has to be handled manually here to allow for
# performing other types of requests even as the first request.
cloudflare_kwargs['allow_redirects'] = False
redirect = self.request(resp.request.method, submit_url, **cloudflare_kwargs)
redirect_location = urlparse(redirect.headers['Location'])
if not redirect_location.netloc:
redirect_url = urlunparse(
(
parsed_url.scheme,
domain,
redirect_location.path,
redirect_location.params,
redirect_location.query,
redirect_location.fragment
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'action="/.*?__cf_chl_captcha_tk__=\S+".*?data\-sitekey=.*?',
resp.text,
re.M | re.DOTALL
)
)
return self.request(resp.request.method, redirect_url, **original_kwargs)
except AttributeError:
pass
return self.request(resp.request.method, redirect.headers['Location'], **original_kwargs)
return False
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# check if the response contains Firewall 1020 Error
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Firewall_Blocked(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'<span class="cf-error-code">1020</span>',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# Wrapper for is_reCaptcha_Challenge, is_IUAM_Challenge, is_Firewall_Blocked
# ------------------------------------------------------------------------------- #
def is_Challenge_Request(self, resp):
if self.is_Firewall_Blocked(resp):
self.simpleException(
CloudflareCode1020,
'Cloudflare has blocked this request (Code 1020 Detected).'
)
if self.is_reCaptcha_Challenge(resp) or self.is_IUAM_Challenge(resp):
return True
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
def IUAM_Challenge_Response(self, body, url, interpreter):
try:
formPayload = re.search(
r'<form (?P<form>id="challenge-form" action="(?P<challengeUUID>.*?'
r'__cf_chl_jschl_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict(
re.findall(
r'name="(r|jschl_vc|pass)"\svalue="(.*?)"',
formPayload['form']
)
)
except AttributeError:
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
try:
payload['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, hostParsed.netloc)
except Exception as e:
self.simpleException(
CloudflareIUAMError,
'Unable to parse Cloudflare anti-bots page: {}'.format(
getattr(e, 'message', e)
)
)
return {
'url': '{}://{}{}'.format(
hostParsed.scheme,
hostParsed.netloc,
self.unescape(formPayload['challengeUUID'])
),
'data': payload
}
# ------------------------------------------------------------------------------- #
# Try to solve the reCaptcha challenge via 3rd party.
# ------------------------------------------------------------------------------- #
def reCaptcha_Challenge_Response(self, provider, provider_params, body, url):
try:
formPayload = re.search(
r'<form class="challenge-form" (?P<form>id="challenge-form" '
r'action="(?P<challengeUUID>.*?__cf_chl_captcha_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareReCaptchaError,
"Cloudflare reCaptcha detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict(
re.findall(
r'(name="r"\svalue|data-ray|data-sitekey)="(.*?)"',
formPayload['form']
)
)
except (AttributeError):
self.simpleException(
CloudflareReCaptchaError,
"Cloudflare reCaptcha detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
return {
'url': '{}://{}{}'.format(
hostParsed.scheme,
hostParsed.netloc,
self.unescape(formPayload['challengeUUID'])
),
'data': OrderedDict([
('r', payload.get('name="r" value', '')),
('id', payload.get('data-ray')),
(
'g-recaptcha-response',
reCaptcha.dynamicImport(
provider.lower()
).solveCaptcha(
url,
payload['data-sitekey'],
provider_params
)
)
])
}
# ------------------------------------------------------------------------------- #
# Attempt to handle and send the challenge response back to cloudflare
# ------------------------------------------------------------------------------- #
def Challenge_Response(self, resp, **kwargs):
if self.is_reCaptcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
# if cfuid is populated before issuing reCaptcha.
# ------------------------------------------------------------------------------- #
resp = self.decodeBrotli(
super(CloudScraper, self).request(resp.request.method, resp.url, **kwargs)
)
if not self.is_reCaptcha_Challenge(resp):
return resp
# ------------------------------------------------------------------------------- #
# if no reCaptcha provider raise a runtime error.
# ------------------------------------------------------------------------------- #
if not self.recaptcha or not isinstance(self.recaptcha, dict) or not self.recaptcha.get('provider'):
self.simpleException(
CloudflareReCaptchaProvider,
"Cloudflare reCaptcha detected, unfortunately you haven't loaded an anti reCaptcha provider "
"correctly via the 'recaptcha' parameter."
)
# ------------------------------------------------------------------------------- #
# if provider is return_response, return the response without doing anything.
# ------------------------------------------------------------------------------- #
if self.recaptcha.get('provider') == 'return_response':
return resp
self.recaptcha['proxies'] = self.proxies
submit_url = self.reCaptcha_Challenge_Response(
self.recaptcha.get('provider'),
self.recaptcha,
resp.text,
resp.url
)
else:
# ------------------------------------------------------------------------------- #
# Cloudflare requires a delay before solving the challenge
# ------------------------------------------------------------------------------- #
if not self.delay:
try:
delay = float(
re.search(
r'submit\(\);\r?\n\s*},\s*([0-9]+)',
resp.text
).group(1)
) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except (AttributeError, ValueError):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM possibility malformed, issue extracing delay value."
)
sleep(self.delay)
# ------------------------------------------------------------------------------- #
submit_url = self.IUAM_Challenge_Response(
resp.text,
resp.url,
self.interpreter
)
# ------------------------------------------------------------------------------- #
# Send the Challenge Response back to Cloudflare
# ------------------------------------------------------------------------------- #
if submit_url:
def updateAttr(obj, name, newValue):
try:
obj[name].update(newValue)
return obj[name]
except (AttributeError, KeyError):
obj[name] = {}
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['data'] = updateAttr(
cloudflare_kwargs,
'data',
submit_url['data']
)
urlParsed = urlparse(resp.url)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{
'Origin': '{}://{}'.format(urlParsed.scheme, urlParsed.netloc),
'Referer': resp.url
}
)
challengeSubmitResponse = self.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# Return response if Cloudflare is doing content pass through instead of 3xx
# else request with redirect URL also handle protocol scheme change http -> https
# ------------------------------------------------------------------------------- #
if not challengeSubmitResponse.is_redirect:
return challengeSubmitResponse
else:
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{'Referer': challengeSubmitResponse.url}
)
if not urlparse(challengeSubmitResponse.headers['Location']).netloc:
redirect_location = urljoin(
challengeSubmitResponse.url,
challengeSubmitResponse.headers['Location']
)
else:
redirect_location = challengeSubmitResponse.headers['Location']
return self.request(
resp.request.method,
redirect_location,
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# We shouldn't be here...
# Re-request the original query and/or process again....
# ------------------------------------------------------------------------------- #
return self.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- #
@classmethod
def create_scraper(cls, sess=None, **kwargs):
@@ -247,24 +590,30 @@ class CloudScraper(Session):
scraper = cls(**kwargs)
if sess:
attrs = ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']
for attr in attrs:
for attr in ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']:
val = getattr(sess, attr, None)
if val:
setattr(scraper, attr, val)
return scraper
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Functions for integrating cloudscraper with other applications and scripts
# ------------------------------------------------------------------------------- #
@classmethod
def get_tokens(cls, url, **kwargs):
scraper = cls.create_scraper(
debug=kwargs.pop('debug', False),
delay=kwargs.pop('delay', None),
interpreter=kwargs.pop('interpreter', 'js2py'),
allow_brotli=kwargs.pop('allow_brotli', True),
**{
field: kwargs.pop(field, None) for field in [
'allow_brotli',
'browser',
'debug',
'delay',
'interpreter',
'recaptcha'
] if field in kwargs
}
)
try:
@@ -283,7 +632,11 @@ class CloudScraper(Session):
cookie_domain = d
break
else:
raise ValueError('Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?')
cls.simpleException(
CloudflareIUAMError,
"Unable to find Cloudflare cookies. Does the site actually "
"have Cloudflare IUAM (I'm Under Attack Mode) enabled?"
)
return (
{
@@ -293,7 +646,7 @@ class CloudScraper(Session):
scraper.headers['User-Agent']
)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
@classmethod
def get_cookie_string(cls, url, **kwargs):
@@ -304,7 +657,18 @@ class CloudScraper(Session):
return '; '.join('='.join(pair) for pair in tokens.items()), user_agent
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
if ssl.OPENSSL_VERSION_INFO < (1, 1, 1):
print(
"DEPRECATION: The OpenSSL being used by this python install ({}) does not meet the minimum supported "
"version (>= OpenSSL 1.1.1) in order to support TLS 1.3 required by Cloudflare, "
"You may encounter an unexpected reCaptcha or cloudflare 1020 blocks.".format(
ssl.OPENSSL_VERSION
)
)
# ------------------------------------------------------------------------------- #
create_scraper = CloudScraper.create_scraper
get_tokens = CloudScraper.get_tokens
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------- #
"""
cloudscraper.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of cloudscraper exceptions.
"""
# ------------------------------------------------------------------------------- #
class CloudflareException(Exception):
"""
Base exception class for cloudscraper for Cloudflare
"""
class CloudflareLoopProtection(CloudflareException):
"""
Raise an exception for recursive depth protection
"""
class CloudflareCode1020(CloudflareException):
"""
Raise an exception for Cloudflare code 1020 block
"""
class CloudflareIUAMError(CloudflareException):
"""
Raise an error for problem extracting IUAM paramters
from Cloudflare payload
"""
class CloudflareReCaptchaError(CloudflareException):
"""
Raise an error for problem extracting reCaptcha paramters
from Cloudflare payload
"""
class CloudflareReCaptchaProvider(CloudflareException):
"""
Raise an exception for no reCaptcha provider loaded for Cloudflare.
"""
# ------------------------------------------------------------------------------- #
class reCaptchaException(Exception):
"""
Base exception class for cloudscraper reCaptcha Providers
"""
class reCaptchaServiceUnavailable(reCaptchaException):
"""
Raise an exception for external services that cannot be reached
"""
class reCaptchaAPIError(reCaptchaException):
"""
Raise an error for error from API response.
"""
class reCaptchaAccountError(reCaptchaException):
"""
Raise an error for reCaptcha provider account problem.
"""
class reCaptchaTimeout(reCaptchaException):
"""
Raise an exception for reCaptcha provider taking too long.
"""
class reCaptchaParameter(reCaptchaException):
"""
Raise an exception for bad or missing Parameter.
"""
class reCaptchaBadJobID(reCaptchaException):
"""
Raise an exception for invalid job id.
"""
class reCaptchaReportError(reCaptchaException):
"""
Raise an error for reCaptcha provider unable to report bad solve.
"""
@@ -1,4 +1,3 @@
import re
import sys
import logging
import abc
@@ -8,20 +7,24 @@ if sys.version_info >= (3, 4):
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
interpreters = {}
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
# ------------------------------------------------------------------------------- #
class JavaScriptInterpreter(ABC):
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def __init__(self, name):
interpreters[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in interpreters:
@@ -35,55 +38,17 @@ class JavaScriptInterpreter(ABC):
return interpreters[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def eval(self, jsEnv, js):
pass
# ------------------------------------------------------------------------------- #
def solveChallenge(self, body, domain):
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
try:
result = self.eval(
re.sub(r'\s{2,}', ' ', jsEnv.format(domain=domain, innerHTML=innerHTML), flags=re.MULTILINE | re.DOTALL),
js
)
float(result)
return float(self.eval(body, domain))
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return result
@@ -0,0 +1,103 @@
from __future__ import absolute_import
import os
import sys
import ctypes.util
from ctypes import c_void_p, c_size_t, byref, create_string_buffer, CDLL
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('chakracore')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
chakraCoreLibrary = None
# check current working directory.
for _libraryFile in ['libChakraCore.so', 'libChakraCore.dylib', 'ChakraCore.dll']:
if os.path.isfile(os.path.join(os.getcwd(), _libraryFile)):
chakraCoreLibrary = os.path.join(os.getcwd(), _libraryFile)
continue
if not chakraCoreLibrary:
chakraCoreLibrary = ctypes.util.find_library('ChakraCore')
if not chakraCoreLibrary:
sys.tracebacklimit = 0
raise RuntimeError(
'ChakraCore library not found in current path or any of your system library paths, '
'please download from https://www.github.com/VeNoMouS/cloudscraper/tree/ChakraCore/, '
'or https://github.com/Microsoft/ChakraCore/'
)
try:
chakraCore = CDLL(chakraCoreLibrary)
except OSError:
sys.tracebacklimit = 0
raise RuntimeError('There was an error loading the ChakraCore library {}'.format(chakraCoreLibrary))
if sys.platform != 'win32':
chakraCore.DllMain(0, 1, 0)
chakraCore.DllMain(0, 2, 0)
script = create_string_buffer(template(body, domain).encode('utf-16'))
runtime = c_void_p()
chakraCore.JsCreateRuntime(0, 0, byref(runtime))
context = c_void_p()
chakraCore.JsCreateContext(runtime, byref(context))
chakraCore.JsSetCurrentContext(context)
fname = c_void_p()
chakraCore.JsCreateString(
'iuam-challenge.js',
len('iuam-challenge.js'),
byref(fname)
)
scriptSource = c_void_p()
chakraCore.JsCreateExternalArrayBuffer(
script,
len(script),
0,
0,
byref(scriptSource)
)
jsResult = c_void_p()
chakraCore.JsRun(scriptSource, 0, fname, 0x02, byref(jsResult))
resultJSString = c_void_p()
chakraCore.JsConvertValueToString(jsResult, byref(resultJSString))
stringLength = c_size_t()
chakraCore.JsCopyString(resultJSString, 0, 0, byref(stringLength))
resultSTR = create_string_buffer(stringLength.value + 1)
chakraCore.JsCopyString(
resultJSString,
byref(resultSTR),
stringLength.value + 1,
0
)
chakraCore.JsDisposeRuntime(runtime)
return resultSTR.value
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()
@@ -0,0 +1,58 @@
import logging
import re
# ------------------------------------------------------------------------------- #
def template(body, domain):
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return '{}{}'.format(
re.sub(
r'\s{2,}',
' ',
jsEnv.format(
domain=domain,
innerHTML=innerHTML
),
re.MULTILINE | re.DOTALL
),
js
)
# ------------------------------------------------------------------------------- #
@@ -6,27 +6,39 @@ import base64
from . import JavaScriptInterpreter
from .encapsulated import template
from .jsunfuck import jsunfuck
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('js2py')
def eval(self, jsEnv, js):
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
jsPayload = template(body, domain)
if js2py.eval_js('(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]') == '1':
logging.warning('WARNING - Please upgrade your js2py https://github.com/PiotrDabkowski/Js2Py, applying work around for the meantime.')
js = jsunfuck(js)
jsPayload = jsunfuck(jsPayload)
def atob(s):
return base64.b64decode('{}'.format(s)).decode('utf-8')
js2py.disable_pyimport()
context = js2py.EvalJs({'atob': atob})
result = context.eval('{}{}'.format(jsEnv, js))
result = context.eval(jsPayload)
return result
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()
@@ -0,0 +1,120 @@
from __future__ import absolute_import
import re
import operator as op
from . import JavaScriptInterpreter
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('native')
def eval(self, body, domain):
# ------------------------------------------------------------------------------- #
operators = {
'+': op.add,
'-': op.sub,
'*': op.mul,
'/': op.truediv
}
# ------------------------------------------------------------------------------- #
def jsfuckToNumber(jsFuck):
t = ''
split_numbers = re.compile(r'-?\d+').findall
for i in re.findall(
r'\((?:\d|\+|\-)*\)',
jsFuck.replace('!+[]', '1').replace('!![]', '1').replace('[]', '0').lstrip('+').replace('(+', '(')
):
t = '{}{}'.format(t, sum(int(x) for x in split_numbers(i)))
return int(t)
# ------------------------------------------------------------------------------- #
def divisorMath(payload, needle, domain):
jsfuckMath = payload.split('/')
if needle in jsfuckMath[1]:
expression = re.findall(r"^(.*?)(.)\(function", jsfuckMath[1])[0]
expression_value = operators[expression[1]](
float(jsfuckToNumber(expression[0])),
float(ord(domain[jsfuckToNumber(jsfuckMath[1][
jsfuckMath[1].find('"("+p+")")}') + len('"("+p+")")}'):-2
])]))
)
else:
expression_value = jsfuckToNumber(jsfuckMath[1])
expression_value = jsfuckToNumber(jsfuckMath[0]) / float(expression_value)
return expression_value
# ------------------------------------------------------------------------------- #
def challengeSolve(body, domain):
jschl_answer = 0
jsfuckChallenge = re.search(
r"setTimeout\(function\(\){\s+var.*?f,\s*(?P<variable>\w+).*?:(?P<init>\S+)};"
r".*?\('challenge-form'\);\s+;(?P<challenge>.*?a\.value)"
r"(?:.*id=\"cf-dn-.*?>(?P<k>\S+)<)?",
body,
re.DOTALL | re.MULTILINE
).groupdict()
jsfuckChallenge['challenge'] = re.finditer(
r'{}.*?([+\-*/])=(.*?);(?=a\.value|{})'.format(
jsfuckChallenge['variable'],
jsfuckChallenge['variable']
),
jsfuckChallenge['challenge']
)
# ------------------------------------------------------------------------------- #
if '/' in jsfuckChallenge['init']:
val = jsfuckChallenge['init'].split('/')
jschl_answer = jsfuckToNumber(val[0]) / float(jsfuckToNumber(val[1]))
else:
jschl_answer = jsfuckToNumber(jsfuckChallenge['init'])
# ------------------------------------------------------------------------------- #
for expressionMatch in jsfuckChallenge['challenge']:
oper, expression = expressionMatch.groups()
if '/' in expression:
expression_value = divisorMath(expression, 'function(p)', domain)
else:
if 'Element' in expression:
expression_value = divisorMath(jsfuckChallenge['k'], '"("+p+")")}', domain)
else:
expression_value = jsfuckToNumber(expression)
jschl_answer = operators[oper](jschl_answer, expression_value)
# ------------------------------------------------------------------------------- #
if not jsfuckChallenge['k'] and '+ t.length' in body:
jschl_answer += len(domain)
# ------------------------------------------------------------------------------- #
return '{0:.10f}'.format(jschl_answer)
# ------------------------------------------------------------------------------- #
return challengeSolve(body, domain)
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()
@@ -1,22 +1,23 @@
import base64
import logging
import subprocess
import sys
from . import JavaScriptInterpreter
from .encapsulated import template
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('nodejs')
def eval(self, jsEnv, js):
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
js = 'var atob = function(str) {return Buffer.from(str, "base64").toString("binary");};' \
'var challenge = atob("%s");' \
@@ -24,23 +25,25 @@ class ChallengeInterpreter(JavaScriptInterpreter):
'var options = {filename: "iuam-challenge.js", timeout: 4000};' \
'var answer = require("vm").runInNewContext(challenge, context, options);' \
'process.stdout.write(String(answer));' \
% base64.b64encode('{}{}'.format(jsEnv, js).encode('UTF-8')).decode('ascii')
% base64.b64encode(template(body, domain).encode('UTF-8')).decode('ascii')
return subprocess.check_output(['node', '-e', js])
except OSError as e:
if e.errno == 2:
raise EnvironmentError(
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`). Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems. (Please read the cloudscraper'
' README\'s Dependencies section: https://github.com/VeNoMouS/cloudscraper#dependencies.'
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`).\n\n'
'Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems.\n\n'
'(Please read the cloudscraper README\'s Dependencies section: '
'https://github.com/VeNoMouS/cloudscraper#dependencies.)'
)
raise
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. %s' % BUG_REPORT)
raise
sys.tracebacklimit = 0
raise RuntimeError('Error executing Cloudflare IUAM Javascript in nodejs')
pass
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()
@@ -0,0 +1,33 @@
from __future__ import absolute_import
import sys
try:
import v8eval
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError('Please install the python module v8eval either via pip or download it from https://github.com/sony/v8eval')
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('v8')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
return v8eval.V8().eval(template(body, domain))
except (TypeError, v8eval.V8Error):
RuntimeError('We encountered an error running the V8 Engine.')
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()
@@ -0,0 +1,236 @@
from __future__ import absolute_import
import requests
from ..exceptions import (
reCaptchaServiceUnavailable,
reCaptchaAPIError,
reCaptchaTimeout,
reCaptchaParameter,
reCaptchaBadJobID,
reCaptchaReportError
)
try:
import polling
except ImportError:
raise ImportError(
"Please install the python module 'polling' via pip or download it from "
"https://github.com/justiniso/polling/"
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('2captcha')
self.host = 'https://2captcha.com'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response, request_type):
if response.status_code in [500, 502]:
raise reCaptchaServiceUnavailable('2Captcha: Server Side Error {}'.format(response.status_code))
errors = {
'in.php': {
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value is in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_ZERO_BALANCE": "You don't have sufficient funds on your account.",
"ERROR_PAGEURL": "pageurl parameter is missing in your request.",
"ERROR_NO_SLOT_AVAILABLE":
"No Slots Available.\nYou can receive this error in two cases:\n"
"1. If you solve ReCaptcha: the queue of your captchas that are not distributed to workers is too long. "
"Queue limit changes dynamically and depends on total amount of captchas awaiting solution and usually it's between 50 and 100 captchas.\n"
"2. If you solve Normal Captcha: your maximum rate for normal captchas is lower than current rate on the server."
"You can change your maximum rate in your account's settings.",
"ERROR_IP_NOT_ALLOWED": "The request is sent from the IP that is not on the list of your allowed IPs.",
"IP_BANNED": "Your IP address is banned due to many frequent attempts to access the server using wrong authorization keys.",
"ERROR_BAD_TOKEN_OR_PAGEURL":
"You can get this error code when sending ReCaptcha V2. "
"That happens if your request contains invalid pair of googlekey and pageurl. "
"The common reason for that is that ReCaptcha is loaded inside an iframe hosted on another domain/subdomain.",
"ERROR_GOOGLEKEY":
"You can get this error code when sending ReCaptcha V2. "
"That means that sitekey value provided in your request is incorrect: it's blank or malformed.",
"MAX_USER_TURN": "You made more than 60 requests within 3 seconds.Your account is banned for 10 seconds. Ban will be lifted automatically."
},
'res.php': {
"ERROR_CAPTCHA_UNSOLVABLE":
"We are unable to solve your captcha - three of our workers were unable solve it "
"or we didn't get an answer within 90 seconds (300 seconds for ReCaptcha V2). "
"We will not charge you for that request.",
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_WRONG_ID_FORMAT": "You've provided captcha ID in wrong format. The ID can contain numbers only.",
"ERROR_WRONG_CAPTCHA_ID": "You've provided incorrect captcha ID.",
"ERROR_BAD_DUPLICATES":
"Error is returned when 100% accuracy feature is enabled. "
"The error means that max numbers of tries is reached but min number of matches not found.",
"REPORT_NOT_RECORDED": "Error is returned to your complain request if you already complained lots of correctly solved captchas.",
"ERROR_IP_ADDRES":
"You can receive this error code when registering a pingback (callback) IP or domain."
"That happes if your request is coming from an IP address that doesn't match the IP address of your pingback IP or domain.",
"ERROR_TOKEN_EXPIRED": "You can receive this error code when sending GeeTest. That error means that challenge value you provided is expired.",
"ERROR_EMPTY_ACTION": "Action parameter is missing or no value is provided for action parameter."
}
}
if response.json().get('status') is False and response.json().get('request') in errors.get(request_type):
raise reCaptchaAPIError(
'{} {}'.format(
response.json().get('request'),
errors.get(request_type).get(response.json().get('request'))
)
)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"2Captcha: Error bad job id to request reCaptcha."
)
def _checkRequest(response):
if response.ok and response.json().get('status') == 1:
return response
self.checkErrorStatus(response, 'res.php')
return None
response = polling.poll(
lambda: self.session.get(
'{}/res.php'.format(self.host),
params={
'key': self.api_key,
'action': 'reportbad',
'id': jobID,
'json': '1'
}
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return True
else:
raise reCaptchaReportError(
"2Captcha: Error - Failed to report bad reCaptcha solve."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID("2Captcha: Error bad job id to request reCaptcha.")
def _checkRequest(response):
if response.ok and response.json().get('status') == 1:
return response
self.checkErrorStatus(response, 'res.php')
return None
response = polling.poll(
lambda: self.session.get(
'{}/res.php'.format(self.host),
params={
'key': self.api_key,
'action': 'get',
'id': jobID,
'json': '1'
}
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise reCaptchaTimeout(
"2Captcha: Error failed to solve reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
if response.ok and response.json().get("status") == 1 and response.json().get('request'):
return response
self.checkErrorStatus(response, 'in.php')
return None
response = polling.poll(
lambda: self.session.post(
'{}/in.php'.format(self.host),
data={
'key': self.api_key,
'method': 'userrecaptcha',
'googlekey': site_key,
'pageurl': site_url,
'json': '1',
'soft_id': '5507698'
},
allow_redirects=False
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise reCaptchaBadJobID(
'2Captcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
if not reCaptchaParams.get('api_key'):
raise reCaptchaParameter(
"2Captcha: Missing api_key parameter."
)
self.api_key = reCaptchaParams.get('api_key')
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(site_url, site_key)
return self.requestJob(jobID)
except polling.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling.TimeoutException:
raise reCaptchaTimeout(
"2Captcha: reCaptcha solve took to long and also failed reporting the job the job id {}.".format(jobID)
)
raise reCaptchaTimeout(
"2Captcha: reCaptcha solve took to long to execute job id {}, aborting.".format(jobID)
)
# ------------------------------------------------------------------------------- #
captchaSolver()
@@ -0,0 +1,207 @@
from __future__ import absolute_import
import re
import requests
try:
import polling
except ImportError:
raise ImportError(
"Please install the python module 'polling' via pip or download it from "
"https://github.com/justiniso/polling/"
)
from ..exceptions import (
reCaptchaServiceUnavailable,
reCaptchaAPIError,
reCaptchaTimeout,
reCaptchaParameter,
reCaptchaBadJobID
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('9kw')
self.host = 'https://www.9kw.eu/index.cgi'
self.maxtimeout = 180
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise reCaptchaServiceUnavailable(
'9kw: Server Side Error {}'.format(response.status_code)
)
error_codes = {
1: 'No API Key available.',
2: 'No API key found.',
3: 'No active API key found.',
4: 'API Key has been disabled by the operator. ',
5: 'No user found.',
6: 'No data found.',
7: 'Found No ID.',
8: 'found No captcha.',
9: 'No image found.',
10: 'Image size not allowed.',
11: 'credit is not sufficient.',
12: 'what was done.',
13: 'No answer contain.',
14: 'Captcha already been answered.',
15: 'Captcha to quickly filed.',
16: 'JD check active.',
17: 'Unknown problem.',
18: 'Found No ID.',
19: 'Incorrect answer.',
20: 'Do not timely filed (Incorrect UserID).',
21: 'Link not allowed.',
22: 'Prohibited submit.',
23: 'Entering prohibited.',
24: 'Too little credit.',
25: 'No entry found.',
26: 'No Conditions accepted.',
27: 'No coupon code found in the database.',
28: 'Already unused voucher code.',
29: 'maxTimeout under 60 seconds.',
30: 'User not found.',
31: 'An account is not yet 24 hours in system.',
32: 'An account does not have the full rights.',
33: 'Plugin needed a update.',
34: 'No HTTPS allowed.',
35: 'No HTTP allowed.',
36: 'Source not allowed.',
37: 'Transfer denied.',
38: 'Incorrect answer without space',
39: 'Incorrect answer with space',
40: 'Incorrect answer with not only numbers',
41: 'Incorrect answer with not only A-Z, a-z',
42: 'Incorrect answer with not only 0-9, A-Z, a-z',
43: 'Incorrect answer with not only [0-9,- ]',
44: 'Incorrect answer with not only [0-9A-Za-z,- ]',
45: 'Incorrect answer with not only coordinates',
46: 'Incorrect answer with not only multiple coordinates',
47: 'Incorrect answer with not only data',
48: 'Incorrect answer with not only rotate number',
49: 'Incorrect answer with not only text',
50: 'Incorrect answer with not only text and too short',
51: 'Incorrect answer with not enough chars',
52: 'Incorrect answer with too many chars',
53: 'Incorrect answer without no or yes',
54: 'Assignment was not found.'
}
if response.text.startswith('{'):
if response.json().get('error'):
raise reCaptchaAPIError(error_codes.get(int(response.json().get('error'))))
else:
error_code = int(re.search(r'^00(?P<error_code>\d+)', response.text).groupdict().get('error_code', 0))
if error_code:
raise reCaptchaAPIError(error_codes.get(error_code))
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"9kw: Error bad job id to request reCaptcha against."
)
def _checkRequest(response):
if response.ok and response.json().get('answer') != 'NO DATA':
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.get(
self.host,
params={
'apikey': self.api_key,
'action': 'usercaptchacorrectdata',
'id': jobID,
'info': 1,
'json': 1
}
),
check_success=_checkRequest,
step=10,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('answer')
else:
raise reCaptchaTimeout("9kw: Error failed to solve reCaptcha.")
# ------------------------------------------------------------------------------- #
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
if response.ok and response.text.startswith('{') and response.json().get('captchaid'):
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
self.host,
data={
'apikey': self.api_key,
'action': 'usercaptchaupload',
'interactive': 1,
'file-upload-01': site_key,
'oldsource': 'recaptchav2',
'pageurl': site_url,
'maxtimeout': self.maxtimeout,
'json': 1
},
allow_redirects=False
),
check_success=_checkRequest,
step=5,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('captchaid')
else:
raise reCaptchaBadJobID('9kw: Error no valid job id was returned.')
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
if not reCaptchaParams.get('api_key'):
raise reCaptchaParameter("9kw: Missing api_key parameter.")
self.api_key = reCaptchaParams.get('api_key')
if reCaptchaParams.get('maxtimeout'):
self.maxtimeout = reCaptchaParams.get('maxtimeout')
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(site_url, site_key)
return self.requestJob(jobID)
except polling.TimeoutException:
raise reCaptchaTimeout(
"9kw: reCaptcha solve took to long to execute 'captchaid' {}, aborting.".format(jobID)
)
# ------------------------------------------------------------------------------- #
captchaSolver()
@@ -0,0 +1,46 @@
import abc
import logging
import sys
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
# ------------------------------------------------------------------------------- #
captchaSolvers = {}
# ------------------------------------------------------------------------------- #
class reCaptcha(ABC):
@abc.abstractmethod
def __init__(self, name):
captchaSolvers[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in captchaSolvers:
try:
__import__('{}.{}'.format(cls.__module__, name))
if not isinstance(captchaSolvers.get(name), reCaptcha):
raise ImportError('The anti reCaptcha provider was not initialized.')
except ImportError:
logging.error("Unable to load {} anti reCaptcha provider".format(name))
raise
return captchaSolvers[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
pass
# ------------------------------------------------------------------------------- #
def solveCaptcha(self, site_url, site_key, reCaptchaParams):
return self.getCaptchaAnswer(site_url, site_key, reCaptchaParams)
@@ -0,0 +1,49 @@
from __future__ import absolute_import
from ..exceptions import reCaptchaParameter
try:
from python_anticaptcha import (
AnticaptchaClient,
NoCaptchaTaskProxylessTask
)
except ImportError:
raise ImportError(
"Please install the python module 'python_anticaptcha' via pip or download it from "
"https://github.com/ad-m/python-anticaptcha"
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('anticaptcha')
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
if not reCaptchaParams.get('api_key'):
raise reCaptchaParameter("anticaptcha: Missing api_key parameter.")
client = AnticaptchaClient(reCaptchaParams.get('api_key'))
if reCaptchaParams.get('proxy'):
client.session.proxies = reCaptchaParams.get('proxies')
task = NoCaptchaTaskProxylessTask(site_url, site_key)
if not hasattr(client, 'createTaskSmee'):
raise NotImplementedError(
"Please upgrade 'python_anticaptcha' via pip or download it from "
"https://github.com/ad-m/python-anticaptcha"
)
job = client.createTaskSmee(task)
return job.get_solution_response()
# ------------------------------------------------------------------------------- #
captchaSolver()
@@ -0,0 +1,227 @@
from __future__ import absolute_import
import json
import requests
try:
import polling
except ImportError:
raise ImportError(
"Please install the python module 'polling' via pip or download it from "
"https://github.com/justiniso/polling/"
)
from ..exceptions import (
reCaptchaServiceUnavailable,
reCaptchaAccountError,
reCaptchaTimeout,
reCaptchaParameter,
reCaptchaBadJobID,
reCaptchaReportError
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('deathbycaptcha')
self.host = 'http://api.dbcapi.me/api'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
errors = dict(
[
(400, "DeathByCaptcha: 400 Bad Request"),
(403, "DeathByCaptcha: 403 Forbidden - Invalid credentails or insufficient credits."),
# (500, "DeathByCaptcha: 500 Internal Server Error."),
(503, "DeathByCaptcha: 503 Service Temporarily Unavailable.")
]
)
if response.status_code in errors:
raise reCaptchaServiceUnavailable(errors.get(response.status_code))
# ------------------------------------------------------------------------------- #
def login(self, username, password):
self.username = username
self.password = password
def _checkRequest(response):
if response.ok:
if response.json().get('is_banned'):
raise reCaptchaAccountError('DeathByCaptcha: Your account is banned.')
if response.json().get('balanace') == 0:
raise reCaptchaAccountError('DeathByCaptcha: insufficient credits.')
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/user'.format(self.host),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=120
)
self.debugRequest(response)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"DeathByCaptcha: Error bad job id to report failed reCaptcha."
)
def _checkRequest(response):
if response.status_code == 200:
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/captcha/{}/report'.format(self.host, jobID),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return True
else:
raise reCaptchaReportError(
"DeathByCaptcha: Error report failed reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"DeathByCaptcha: Error bad job id to request reCaptcha."
)
def _checkRequest(response):
if response.ok and response.json().get('text'):
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.get(
'{}/captcha/{}'.format(self.host, jobID),
headers={'Accept': 'application/json'}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('text')
else:
raise reCaptchaTimeout(
"DeathByCaptcha: Error failed to solve reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
if response.ok and response.json().get("is_correct") and response.json().get('captcha'):
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/captcha'.format(self.host),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password,
'type': '4',
'token_params': json.dumps({
'googlekey': site_key,
'pageurl': site_url
})
},
allow_redirects=False
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('captcha')
else:
raise reCaptchaBadJobID(
'DeathByCaptcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
for param in ['username', 'password']:
if not reCaptchaParams.get(param):
raise reCaptchaParameter(
"DeathByCaptcha: Missing '{}' parameter.".format(param)
)
setattr(self, param, reCaptchaParams.get(param))
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(site_url, site_key)
return self.requestJob(jobID)
except polling.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling.TimeoutException:
raise reCaptchaTimeout(
"DeathByCaptcha: reCaptcha solve took to long and also failed reporting the job id {}.".format(jobID)
)
raise reCaptchaTimeout(
"DeathByCaptcha: reCaptcha solve took to long to execute job id {}, aborting.".format(jobID)
)
# ------------------------------------------------------------------------------- #
captchaSolver()
@@ -1,40 +1,117 @@
import os
import json
import os
import random
import logging
import re
import sys
import ssl
from collections import OrderedDict
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class User_Agent():
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def __init__(self, *args, **kwargs):
self.headers = None
self.cipherSuite = []
self.loadUserAgent(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def loadHeaders(self, user_agents, user_agent_version):
if user_agents.get(self.browser).get('releases').get(user_agent_version).get('headers'):
self.headers = user_agents.get(self.browser).get('releases').get(user_agent_version).get('headers')
else:
self.headers = user_agents.get(self.browser).get('default_headers')
# ------------------------------------------------------------------------------- #
def filterAgents(self, releases):
filtered = {}
for release in releases:
if self.mobile and releases[release]['User-Agent']['mobile']:
filtered[release] = filtered.get(release, []) + releases[release]['User-Agent']['mobile']
if self.desktop and releases[release]['User-Agent']['desktop']:
filtered[release] = filtered.get(release, []) + releases[release]['User-Agent']['desktop']
return filtered
# ------------------------------------------------------------------------------- #
def tryMatchCustom(self, user_agents):
for browser in user_agents:
for release in user_agents[browser]['releases']:
for platform in ['mobile', 'desktop']:
if re.search(re.escape(self.custom), ' '.join(user_agents[browser]['releases'][release]['User-Agent'][platform])):
self.browser = browser
self.loadHeaders(user_agents, release)
self.headers['User-Agent'] = self.custom
self.cipherSuite = user_agents[self.browser].get('cipherSuite', [])
return True
return False
# ------------------------------------------------------------------------------- #
def loadUserAgent(self, *args, **kwargs):
browser = kwargs.pop('browser', 'chrome')
self.browser = kwargs.pop('browser', None)
user_agents = json.load(
open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r'),
object_pairs_hook=OrderedDict
)
if isinstance(self.browser, dict):
self.custom = self.browser.get('custom', None)
self.desktop = self.browser.get('desktop', True)
self.mobile = self.browser.get('mobile', True)
self.browser = self.browser.get('browser', None)
else:
self.custom = kwargs.pop('custom', None)
self.desktop = kwargs.pop('desktop', True)
self.mobile = kwargs.pop('mobile', True)
if not user_agents.get(browser):
logging.error('Sorry "{}" browser User-Agent was not found.'.format(browser))
raise
if not self.desktop and not self.mobile:
sys.tracebacklimit = 0
raise RuntimeError("Sorry you can't have mobile and desktop disabled at the same time.")
user_agent = random.choice(user_agents.get(browser))
with open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r') as fp:
user_agents = json.load(
fp,
object_pairs_hook=OrderedDict
)
self.headers = user_agent.get('headers')
self.headers['User-Agent'] = random.choice(user_agent.get('User-Agent'))
if self.custom:
if not self.tryMatchCustom(user_agents):
self.cipherSuite = [
ssl._DEFAULT_CIPHERS,
'!AES128-SHA',
'!ECDHE-RSA-AES256-SHA',
]
self.headers = OrderedDict([
('User-Agent', self.custom),
('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'),
('Accept-Language', 'en-US,en;q=0.9'),
('Accept-Encoding', 'gzip, deflate, br')
])
else:
if self.browser and not user_agents.get(self.browser):
sys.tracebacklimit = 0
raise RuntimeError('Sorry "{}" browser User-Agent was not found.'.format(self.browser))
if not kwargs.get('allow_brotli', False):
if 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br']).strip()
if not self.browser:
self.browser = random.SystemRandom().choice(list(user_agents))
self.cipherSuite = user_agents.get(self.browser).get('cipherSuite', [])
filteredAgents = self.filterAgents(user_agents.get(self.browser).get('releases'))
user_agent_version = random.SystemRandom().choice(list(filteredAgents))
self.loadHeaders(user_agents, user_agent_version)
self.headers['User-Agent'] = random.SystemRandom().choice(filteredAgents[user_agent_version])
if not kwargs.get('allow_brotli', False) and 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([
encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br'
]).strip()
File diff suppressed because it is too large Load Diff
@@ -20,7 +20,7 @@ from exceptions import APIThrottled
from dogpile.cache.api import NO_VALUE
from subliminal.cache import region
from subliminal_patch.pitcher import pitchers
from cloudscraper import CloudScraper
from cloudscraper import CloudScraper, User_Agent
try:
import brotli
@@ -89,7 +89,9 @@ class CFSession(CloudScraper):
# Check if Cloudflare anti-bot is on
try:
if self.isChallengeRequest(resp):
print repr(resp)
if self.is_IUAM_Challenge(resp):
print "TRYYYYYYYYYY"
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.
@@ -97,9 +99,10 @@ class CFSession(CloudScraper):
resp = ourSuper.request(method, url, *args, **kwargs)
else:
# Solve Challenge
resp = self.sendChallengeResponse(resp, **kwargs)
resp = self.Challenge_Response(resp, **kwargs)
except ValueError, e:
print "YEEEEEEEEEEEEEE"
if e.message == "Captcha":
parsed_url = urlparse(url)
domain = parsed_url.netloc