mirror of
https://ghproxy.com/https://github.com/yoshiko2/Movie_Data_Capture.git
synced 2024-09-20 10:45:38 +00:00
603 lines
23 KiB
Python
603 lines
23 KiB
Python
|
# build-in lib
|
|||
|
import os.path
|
|||
|
import os
|
|||
|
import re
|
|||
|
import uuid
|
|||
|
import json
|
|||
|
import time
|
|||
|
import typing
|
|||
|
from unicodedata import category
|
|||
|
from concurrent.futures import ThreadPoolExecutor
|
|||
|
|
|||
|
# third party lib
|
|||
|
import requests
|
|||
|
from requests.adapters import HTTPAdapter
|
|||
|
import mechanicalsoup
|
|||
|
from pathlib import Path
|
|||
|
from urllib3.util.retry import Retry
|
|||
|
from lxml import etree
|
|||
|
from cloudscraper import create_scraper
|
|||
|
|
|||
|
# project wide
|
|||
|
import config
|
|||
|
|
|||
|
|
|||
|
def get_xpath_single(html_code: str, xpath):
|
|||
|
html = etree.fromstring(html_code, etree.HTMLParser())
|
|||
|
result1 = str(html.xpath(xpath)).strip(" ['']")
|
|||
|
return result1
|
|||
|
|
|||
|
|
|||
|
G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.133 Safari/537.36'
|
|||
|
|
|||
|
|
|||
|
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers=None):
|
|||
|
"""
|
|||
|
网页请求核心函数
|
|||
|
"""
|
|||
|
verify = config.getInstance().cacert_file()
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
errors = ""
|
|||
|
|
|||
|
headers = {"User-Agent": ua or G_USER_AGENT} # noqa
|
|||
|
if json_headers is not None:
|
|||
|
headers.update(json_headers)
|
|||
|
|
|||
|
for i in range(config_proxy.retry):
|
|||
|
try:
|
|||
|
if config_proxy.enable:
|
|||
|
proxies = config_proxy.proxies()
|
|||
|
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, proxies=proxies,
|
|||
|
verify=verify,
|
|||
|
cookies=cookies)
|
|||
|
else:
|
|||
|
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, cookies=cookies)
|
|||
|
|
|||
|
if return_type == "object":
|
|||
|
return result
|
|||
|
elif return_type == "content":
|
|||
|
return result.content
|
|||
|
else:
|
|||
|
result.encoding = encoding or result.apparent_encoding
|
|||
|
return result.text
|
|||
|
except Exception as e:
|
|||
|
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
|
|||
|
errors = str(e)
|
|||
|
if "getaddrinfo failed" in errors:
|
|||
|
print("[-]Connect Failed! Please Check your proxy config")
|
|||
|
debug = config.getInstance().debug()
|
|||
|
if debug:
|
|||
|
print("[-]" + errors)
|
|||
|
else:
|
|||
|
print("[-]" + errors)
|
|||
|
print('[-]Connect Failed! Please check your Proxy or Network!')
|
|||
|
raise Exception('Connect Failed')
|
|||
|
|
|||
|
|
|||
|
def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
errors = ""
|
|||
|
headers_ua = {"User-Agent": G_USER_AGENT}
|
|||
|
if headers is None:
|
|||
|
headers = headers_ua
|
|||
|
else:
|
|||
|
headers.update(headers_ua)
|
|||
|
|
|||
|
for i in range(config_proxy.retry):
|
|||
|
try:
|
|||
|
if config_proxy.enable:
|
|||
|
proxies = config_proxy.proxies()
|
|||
|
result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=config_proxy.timeout)
|
|||
|
else:
|
|||
|
result = requests.post(url, data=query, headers=headers, timeout=config_proxy.timeout)
|
|||
|
return result
|
|||
|
except Exception as e:
|
|||
|
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
|
|||
|
errors = str(e)
|
|||
|
print("[-]Connect Failed! Please check your Proxy or Network!")
|
|||
|
print("[-]" + errors)
|
|||
|
|
|||
|
|
|||
|
G_DEFAULT_TIMEOUT = 10 # seconds
|
|||
|
|
|||
|
|
|||
|
class TimeoutHTTPAdapter(HTTPAdapter):
|
|||
|
def __init__(self, *args, **kwargs):
|
|||
|
self.timeout = G_DEFAULT_TIMEOUT
|
|||
|
if "timeout" in kwargs:
|
|||
|
self.timeout = kwargs["timeout"]
|
|||
|
del kwargs["timeout"]
|
|||
|
super().__init__(*args, **kwargs)
|
|||
|
|
|||
|
def send(self, request, **kwargs):
|
|||
|
timeout = kwargs.get("timeout")
|
|||
|
if timeout is None:
|
|||
|
kwargs["timeout"] = self.timeout
|
|||
|
return super().send(request, **kwargs)
|
|||
|
|
|||
|
|
|||
|
# with keep-alive feature
|
|||
|
def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
|
|||
|
encoding: str = None):
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
session = requests.Session()
|
|||
|
if isinstance(cookies, dict) and len(cookies):
|
|||
|
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
|
|||
|
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
|
|||
|
status_forcelist=[429, 500, 502, 503, 504])
|
|||
|
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
if config_proxy.enable:
|
|||
|
session.verify = config.getInstance().cacert_file()
|
|||
|
session.proxies = config_proxy.proxies()
|
|||
|
headers = {"User-Agent": ua or G_USER_AGENT}
|
|||
|
session.headers = headers
|
|||
|
try:
|
|||
|
if isinstance(url, str) and len(url):
|
|||
|
result = session.get(str(url))
|
|||
|
else: # 空url参数直接返回可重用session对象,无需设置return_type
|
|||
|
return session
|
|||
|
if not result.ok:
|
|||
|
return None
|
|||
|
if return_type == "object":
|
|||
|
return result
|
|||
|
elif return_type == "content":
|
|||
|
return result.content
|
|||
|
elif return_type == "session":
|
|||
|
return result, session
|
|||
|
else:
|
|||
|
result.encoding = encoding or "utf-8"
|
|||
|
return result.text
|
|||
|
except requests.exceptions.ProxyError:
|
|||
|
print("[-]get_html_session() Proxy error! Please check your Proxy")
|
|||
|
except requests.exceptions.RequestException:
|
|||
|
pass
|
|||
|
except Exception as e:
|
|||
|
print(f"[-]get_html_session() failed. {e}")
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
|
|||
|
encoding: str = None, use_scraper: bool = False):
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session()
|
|||
|
if isinstance(cookies, dict) and len(cookies):
|
|||
|
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
|
|||
|
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
|
|||
|
status_forcelist=[429, 500, 502, 503, 504])
|
|||
|
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
if config_proxy.enable:
|
|||
|
s.verify = config.getInstance().cacert_file()
|
|||
|
s.proxies = config_proxy.proxies()
|
|||
|
try:
|
|||
|
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
|
|||
|
if isinstance(url, str) and len(url):
|
|||
|
result = browser.open(url)
|
|||
|
else:
|
|||
|
return browser
|
|||
|
if not result.ok:
|
|||
|
return None
|
|||
|
|
|||
|
if return_type == "object":
|
|||
|
return result
|
|||
|
elif return_type == "content":
|
|||
|
return result.content
|
|||
|
elif return_type == "browser":
|
|||
|
return result, browser
|
|||
|
else:
|
|||
|
result.encoding = encoding or "utf-8"
|
|||
|
return result.text
|
|||
|
except requests.exceptions.ProxyError:
|
|||
|
print("[-]get_html_by_browser() Proxy error! Please check your Proxy")
|
|||
|
except Exception as e:
|
|||
|
print(f'[-]get_html_by_browser() Failed! {e}')
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None,
|
|||
|
return_type: str = None, encoding: str = None):
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
s = requests.Session()
|
|||
|
if isinstance(cookies, dict) and len(cookies):
|
|||
|
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
|
|||
|
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
|
|||
|
status_forcelist=[429, 500, 502, 503, 504])
|
|||
|
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
if config_proxy.enable:
|
|||
|
s.verify = config.getInstance().cacert_file()
|
|||
|
s.proxies = config_proxy.proxies()
|
|||
|
try:
|
|||
|
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
|
|||
|
result = browser.open(url)
|
|||
|
if not result.ok:
|
|||
|
return None
|
|||
|
form = browser.select_form() if form_select is None else browser.select_form(form_select)
|
|||
|
if isinstance(fields, dict):
|
|||
|
for k, v in fields.items():
|
|||
|
browser[k] = v
|
|||
|
response = browser.submit_selected()
|
|||
|
|
|||
|
if return_type == "object":
|
|||
|
return response
|
|||
|
elif return_type == "content":
|
|||
|
return response.content
|
|||
|
elif return_type == "browser":
|
|||
|
return response, browser
|
|||
|
else:
|
|||
|
result.encoding = encoding or "utf-8"
|
|||
|
return response.text
|
|||
|
except requests.exceptions.ProxyError:
|
|||
|
print("[-]get_html_by_form() Proxy error! Please check your Proxy")
|
|||
|
except Exception as e:
|
|||
|
print(f'[-]get_html_by_form() Failed! {e}')
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
|
|||
|
encoding: str = None):
|
|||
|
config_proxy = config.getInstance().proxy()
|
|||
|
session = create_scraper(browser={'custom': ua or G_USER_AGENT, })
|
|||
|
if isinstance(cookies, dict) and len(cookies):
|
|||
|
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
|
|||
|
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
|
|||
|
status_forcelist=[429, 500, 502, 503, 504])
|
|||
|
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
|
|||
|
if config_proxy.enable:
|
|||
|
session.verify = config.getInstance().cacert_file()
|
|||
|
session.proxies = config_proxy.proxies()
|
|||
|
try:
|
|||
|
if isinstance(url, str) and len(url):
|
|||
|
result = session.get(str(url))
|
|||
|
else: # 空url参数直接返回可重用scraper对象,无需设置return_type
|
|||
|
return session
|
|||
|
if not result.ok:
|
|||
|
return None
|
|||
|
if return_type == "object":
|
|||
|
return result
|
|||
|
elif return_type == "content":
|
|||
|
return result.content
|
|||
|
elif return_type == "scraper":
|
|||
|
return result, session
|
|||
|
else:
|
|||
|
result.encoding = encoding or "utf-8"
|
|||
|
return result.text
|
|||
|
except requests.exceptions.ProxyError:
|
|||
|
print("[-]get_html_by_scraper() Proxy error! Please check your Proxy")
|
|||
|
except Exception as e:
|
|||
|
print(f"[-]get_html_by_scraper() failed. {e}")
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
# def get_javlib_cookie() -> [dict, str]:
|
|||
|
# import cloudscraper
|
|||
|
# switch, proxy, timeout, retry_count, proxytype = config.getInstance().proxy()
|
|||
|
# proxies = get_proxy(proxy, proxytype)
|
|||
|
#
|
|||
|
# raw_cookie = {}
|
|||
|
# user_agent = ""
|
|||
|
#
|
|||
|
# # Get __cfduid/cf_clearance and user-agent
|
|||
|
# for i in range(retry_count):
|
|||
|
# try:
|
|||
|
# if switch == 1 or switch == '1':
|
|||
|
# raw_cookie, user_agent = cloudscraper.get_cookie_string(
|
|||
|
# "http://www.javlibrary.com/",
|
|||
|
# proxies=proxies
|
|||
|
# )
|
|||
|
# else:
|
|||
|
# raw_cookie, user_agent = cloudscraper.get_cookie_string(
|
|||
|
# "http://www.javlibrary.com/"
|
|||
|
# )
|
|||
|
# except requests.exceptions.ProxyError:
|
|||
|
# print("[-] ProxyError, retry {}/{}".format(i + 1, retry_count))
|
|||
|
# except cloudscraper.exceptions.CloudflareIUAMError:
|
|||
|
# print("[-] IUAMError, retry {}/{}".format(i + 1, retry_count))
|
|||
|
#
|
|||
|
# return raw_cookie, user_agent
|
|||
|
|
|||
|
|
|||
|
def translate(
|
|||
|
src: str,
|
|||
|
target_language: str = config.getInstance().get_target_language(),
|
|||
|
engine: str = config.getInstance().get_translate_engine(),
|
|||
|
app_id: str = "",
|
|||
|
key: str = "",
|
|||
|
delay: int = 0,
|
|||
|
) -> str:
|
|||
|
"""
|
|||
|
translate japanese kana to simplified chinese
|
|||
|
翻译日语假名到简体中文
|
|||
|
:raises ValueError: Non-existent translation engine
|
|||
|
"""
|
|||
|
trans_result = ""
|
|||
|
# 中文句子如果包含&等符号会被谷歌翻译截断损失内容,而且中文翻译到中文也没有意义,故而忽略,只翻译带有日语假名的
|
|||
|
if (is_japanese(src) == False) and ("zh_" in target_language):
|
|||
|
return src
|
|||
|
if engine == "google-free":
|
|||
|
gsite = config.getInstance().get_translate_service_site()
|
|||
|
if not re.match('^translate\.google\.(com|com\.\w{2}|\w{2})$', gsite):
|
|||
|
gsite = 'translate.google.cn'
|
|||
|
url = (
|
|||
|
f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}"
|
|||
|
)
|
|||
|
result = get_html(url=url, return_type="object")
|
|||
|
if not result.ok:
|
|||
|
print('[-]Google-free translate web API calling failed.')
|
|||
|
return ''
|
|||
|
|
|||
|
translate_list = [i["trans"] for i in result.json()["sentences"]]
|
|||
|
trans_result = trans_result.join(translate_list)
|
|||
|
elif engine == "azure":
|
|||
|
url = "https://api.cognitive.microsofttranslator.com/translate?api-version=3.0&to=" + target_language
|
|||
|
headers = {
|
|||
|
'Ocp-Apim-Subscription-Key': key,
|
|||
|
'Ocp-Apim-Subscription-Region': "global",
|
|||
|
'Content-type': 'application/json',
|
|||
|
'X-ClientTraceId': str(uuid.uuid4())
|
|||
|
}
|
|||
|
body = json.dumps([{'text': src}])
|
|||
|
result = post_html(url=url, query=body, headers=headers)
|
|||
|
translate_list = [i["text"] for i in result.json()[0]["translations"]]
|
|||
|
trans_result = trans_result.join(translate_list)
|
|||
|
elif engine == "deeplx":
|
|||
|
url = config.getInstance().get_translate_service_site()
|
|||
|
res = requests.post(f"{url}/translate", json={
|
|||
|
'text': src,
|
|||
|
'source_lang': 'auto',
|
|||
|
'target_lang': target_language,
|
|||
|
})
|
|||
|
if res.text.strip():
|
|||
|
trans_result = res.json().get('data')
|
|||
|
else:
|
|||
|
raise ValueError("Non-existent translation engine")
|
|||
|
|
|||
|
time.sleep(delay)
|
|||
|
return trans_result
|
|||
|
|
|||
|
|
|||
|
def load_cookies(cookie_json_filename: str) -> typing.Tuple[typing.Optional[dict], typing.Optional[str]]:
|
|||
|
"""
|
|||
|
加载cookie,用于以会员方式访问非游客内容
|
|||
|
|
|||
|
:filename: cookie文件名。获取cookie方式:从网站登录后,通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容,以JSON方式保存
|
|||
|
|
|||
|
# 示例: FC2-755670 url https://javdb9.com/v/vO8Mn
|
|||
|
# json 文件格式
|
|||
|
# 文件名: 站点名.json,示例 javdb9.json
|
|||
|
# 内容(文件编码:UTF-8):
|
|||
|
{
|
|||
|
"over18":"1",
|
|||
|
"redirect_to":"%2Fv%2FvO8Mn",
|
|||
|
"remember_me_token":"***********",
|
|||
|
"_jdb_session":"************",
|
|||
|
"locale":"zh",
|
|||
|
"__cfduid":"*********",
|
|||
|
"theme":"auto"
|
|||
|
}
|
|||
|
"""
|
|||
|
filename = os.path.basename(cookie_json_filename)
|
|||
|
if not len(filename):
|
|||
|
return None, None
|
|||
|
path_search_order = (
|
|||
|
Path.cwd() / filename,
|
|||
|
Path.home() / filename,
|
|||
|
Path.home() / f".mdc/{filename}",
|
|||
|
Path.home() / f".local/share/mdc/{filename}"
|
|||
|
)
|
|||
|
cookies_filename = None
|
|||
|
try:
|
|||
|
for p in path_search_order:
|
|||
|
if p.is_file():
|
|||
|
cookies_filename = str(p.resolve())
|
|||
|
break
|
|||
|
if not cookies_filename:
|
|||
|
return None, None
|
|||
|
return json.loads(Path(cookies_filename).read_text(encoding='utf-8')), cookies_filename
|
|||
|
except:
|
|||
|
return None, None
|
|||
|
|
|||
|
|
|||
|
def file_modification_days(filename: str) -> int:
|
|||
|
"""
|
|||
|
文件修改时间距此时的天数
|
|||
|
"""
|
|||
|
mfile = Path(filename)
|
|||
|
if not mfile.is_file():
|
|||
|
return 9999
|
|||
|
mtime = int(mfile.stat().st_mtime)
|
|||
|
now = int(time.time())
|
|||
|
days = int((now - mtime) / (24 * 60 * 60))
|
|||
|
if days < 0:
|
|||
|
return 9999
|
|||
|
return days
|
|||
|
|
|||
|
|
|||
|
def file_not_exist_or_empty(filepath) -> bool:
|
|||
|
return not os.path.isfile(filepath) or os.path.getsize(filepath) == 0
|
|||
|
|
|||
|
|
|||
|
def is_japanese(raw: str) -> bool:
|
|||
|
"""
|
|||
|
日语简单检测
|
|||
|
"""
|
|||
|
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE))
|
|||
|
|
|||
|
|
|||
|
def download_file_with_filename(url: str, filename: str, path: str) -> None:
|
|||
|
"""
|
|||
|
download file save to give path with given name from given url
|
|||
|
"""
|
|||
|
conf = config.getInstance()
|
|||
|
config_proxy = conf.proxy()
|
|||
|
|
|||
|
for i in range(config_proxy.retry):
|
|||
|
try:
|
|||
|
if config_proxy.enable:
|
|||
|
if not os.path.exists(path):
|
|||
|
try:
|
|||
|
os.makedirs(path)
|
|||
|
except:
|
|||
|
print(f"[-]Fatal error! Can not make folder '{path}'")
|
|||
|
os._exit(0)
|
|||
|
r = get_html(url=url, return_type='content')
|
|||
|
if r == '':
|
|||
|
print('[-]Movie Download Data not found!')
|
|||
|
return
|
|||
|
with open(os.path.join(path, filename), "wb") as code:
|
|||
|
code.write(r)
|
|||
|
return
|
|||
|
else:
|
|||
|
if not os.path.exists(path):
|
|||
|
try:
|
|||
|
os.makedirs(path)
|
|||
|
except:
|
|||
|
print(f"[-]Fatal error! Can not make folder '{path}'")
|
|||
|
os._exit(0)
|
|||
|
r = get_html(url=url, return_type='content')
|
|||
|
if r == '':
|
|||
|
print('[-]Movie Download Data not found!')
|
|||
|
return
|
|||
|
with open(os.path.join(path, filename), "wb") as code:
|
|||
|
code.write(r)
|
|||
|
return
|
|||
|
except requests.exceptions.ProxyError:
|
|||
|
i += 1
|
|||
|
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
|
|||
|
except requests.exceptions.ConnectTimeout:
|
|||
|
i += 1
|
|||
|
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
|
|||
|
except requests.exceptions.ConnectionError:
|
|||
|
i += 1
|
|||
|
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
|
|||
|
except requests.exceptions.RequestException:
|
|||
|
i += 1
|
|||
|
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
|
|||
|
except IOError:
|
|||
|
raise ValueError(f"[-]Create Directory '{path}' failed!")
|
|||
|
return
|
|||
|
print('[-]Connect Failed! Please check your Proxy or Network!')
|
|||
|
raise ValueError('[-]Connect Failed! Please check your Proxy or Network!')
|
|||
|
return
|
|||
|
|
|||
|
|
|||
|
def download_one_file(args) -> str:
|
|||
|
"""
|
|||
|
download file save to given path from given url
|
|||
|
wrapped for map function
|
|||
|
"""
|
|||
|
|
|||
|
(url, save_path, json_headers) = args
|
|||
|
if json_headers is not None:
|
|||
|
filebytes = get_html(url, return_type='content', json_headers=json_headers['headers'])
|
|||
|
else:
|
|||
|
filebytes = get_html(url, return_type='content')
|
|||
|
if isinstance(filebytes, bytes) and len(filebytes):
|
|||
|
with save_path.open('wb') as fpbyte:
|
|||
|
if len(filebytes) == fpbyte.write(filebytes):
|
|||
|
return str(save_path)
|
|||
|
|
|||
|
|
|||
|
def parallel_download_files(dn_list: typing.Iterable[typing.Sequence], parallel: int = 0, json_headers=None):
|
|||
|
"""
|
|||
|
download files in parallel 多线程下载文件
|
|||
|
|
|||
|
用法示例: 2线程同时下载两个不同文件,并保存到不同路径,路径目录可未创建,但需要具备对目标目录和文件的写权限
|
|||
|
parallel_download_files([
|
|||
|
('https://site1/img/p1.jpg', 'C:/temp/img/p1.jpg'),
|
|||
|
('https://site2/cover/n1.xml', 'C:/tmp/cover/n1.xml')
|
|||
|
])
|
|||
|
|
|||
|
:dn_list: 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) fullpath可以是str或Path
|
|||
|
:parallel: 并行下载的线程池线程数,为0则由函数自己决定
|
|||
|
"""
|
|||
|
mp_args = []
|
|||
|
for url, fullpath in dn_list:
|
|||
|
if url and isinstance(url, str) and url.startswith('http') \
|
|||
|
and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)):
|
|||
|
fullpath = Path(fullpath)
|
|||
|
fullpath.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
mp_args.append((url, fullpath, json_headers))
|
|||
|
if not len(mp_args):
|
|||
|
return []
|
|||
|
if not isinstance(parallel, int) or parallel not in range(1, 200):
|
|||
|
parallel = min(5, len(mp_args))
|
|||
|
with ThreadPoolExecutor(parallel) as pool:
|
|||
|
results = list(pool.map(download_one_file, mp_args))
|
|||
|
return results
|
|||
|
|
|||
|
|
|||
|
def delete_all_elements_in_list(string: str, lists: typing.Iterable[str]):
|
|||
|
"""
|
|||
|
delete same string in given list
|
|||
|
"""
|
|||
|
new_lists = []
|
|||
|
for i in lists:
|
|||
|
if i != string:
|
|||
|
new_lists.append(i)
|
|||
|
return new_lists
|
|||
|
|
|||
|
|
|||
|
def delete_all_elements_in_str(string_delete: str, string: str):
|
|||
|
"""
|
|||
|
delete same string in given list
|
|||
|
"""
|
|||
|
for i in string:
|
|||
|
if i == string_delete:
|
|||
|
string = string.replace(i, "")
|
|||
|
return string
|
|||
|
|
|||
|
|
|||
|
# print format空格填充对齐内容包含中文时的空格计算
|
|||
|
def cn_space(v: str, n: int) -> int:
|
|||
|
return n - [category(c) for c in v].count('Lo')
|
|||
|
|
|||
|
|
|||
|
"""
|
|||
|
Usage: python ./ADC_function.py https://cn.bing.com/
|
|||
|
Purpose: benchmark get_html_session
|
|||
|
benchmark get_html_by_scraper
|
|||
|
benchmark get_html_by_browser
|
|||
|
benchmark get_html
|
|||
|
TODO: may be this should move to unittest directory
|
|||
|
"""
|
|||
|
if __name__ == "__main__":
|
|||
|
import sys, timeit
|
|||
|
from http.client import HTTPConnection
|
|||
|
|
|||
|
|
|||
|
def benchmark(times: int, url):
|
|||
|
print(f"HTTP GET Benchmark times:{times} url:{url}")
|
|||
|
tm = timeit.timeit(f"_ = session1.get('{url}')",
|
|||
|
"from __main__ import get_html_session;session1=get_html_session()",
|
|||
|
number=times)
|
|||
|
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
|
|||
|
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
|
|||
|
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
|
|||
|
number=times)
|
|||
|
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
|
|||
|
tm = timeit.timeit(f"_ = browser1.open('{url}')",
|
|||
|
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
|
|||
|
number=times)
|
|||
|
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
|
|||
|
tm = timeit.timeit(f"_ = get_html('{url}')",
|
|||
|
"from __main__ import get_html",
|
|||
|
number=times)
|
|||
|
print(f' *{tm:>10.5f}s get_html()')
|
|||
|
|
|||
|
|
|||
|
# target_url = "https://www.189.cn/"
|
|||
|
target_url = "http://www.chinaunicom.com"
|
|||
|
HTTPConnection.debuglevel = 1
|
|||
|
html_session = get_html_session()
|
|||
|
_ = html_session.get(target_url)
|
|||
|
HTTPConnection.debuglevel = 0
|
|||
|
|
|||
|
# times
|
|||
|
t = 100
|
|||
|
if len(sys.argv) > 1:
|
|||
|
target_url = sys.argv[1]
|
|||
|
benchmark(t, target_url)
|