Commit 7ab7cd7a by lichenggang

Initial commit

parents
seotest
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding" addBOMForNewFiles="with NO BOM" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (lseo)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/../seotest/.idea/seotest.iml" filepath="$PROJECT_DIR$/../seotest/.idea/seotest.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ChangeListManager">
<list default="true" id="4007b60f-39fd-4f58-919d-947ddb1c6e19" name="Default Changelist" comment="" />
<option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="CoverageDataManager">
<SUITE FILE_PATH="coverage/seotest$test.coverage" NAME="test Coverage Results" MODIFIED="1566973873935" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
</component>
<component name="FileEditorManager">
<leaf SIDE_TABS_SIZE_LIMIT_KEY="375">
<file pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/test.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="10" selection-start-line="10" selection-end-line="10" />
</state>
</provider>
</entry>
</file>
</leaf>
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Python Script" />
</list>
</option>
</component>
<component name="FindInProjectRecents">
<findStrings>
<find>1366</find>
<find>article</find>
<find>2019</find>
</findStrings>
</component>
<component name="IdeDocumentHistory">
<option name="CHANGED_PATHS">
<list>
<option value="$PROJECT_DIR$/test.py" />
</list>
</option>
</component>
<component name="ProjectFrameBounds" extendedState="7">
<option name="x" value="528" />
<option name="width" value="1400" />
<option name="height" value="909" />
</component>
<component name="ProjectView">
<navigator proportions="" version="1">
<foldersAlwaysOnTop value="true" />
</navigator>
<panes>
<pane id="ProjectPane">
<subPane>
<expand>
<path>
<item name="seotest" type="b2602c69:ProjectViewProjectNode" />
<item name="seotest" type="462c0819:PsiDirectoryNode" />
</path>
</expand>
<select />
</subPane>
</pane>
<pane id="Scope" />
</panes>
</component>
<component name="PropertiesComponent">
<property name="WebServerToolWindowFactoryState" value="false" />
<property name="nodejs_interpreter_path.stuck_in_default_project" value="undefined stuck path" />
<property name="nodejs_npm_path_reset_for_default_project" value="true" />
<property name="settings.editor.selected.configurable" value="com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable" />
</component>
<component name="RunDashboard">
<option name="ruleStates">
<list>
<RuleState>
<option name="name" value="ConfigurationTypeDashboardGroupingRule" />
</RuleState>
<RuleState>
<option name="name" value="StatusDashboardGroupingRule" />
</RuleState>
</list>
</option>
</component>
<component name="RunManager">
<configuration name="test" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="seotest" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python.test" />
</list>
</recent_temporary>
</component>
<component name="SvnConfiguration">
<configuration />
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="4007b60f-39fd-4f58-919d-947ddb1c6e19" name="Default Changelist" comment="" />
<created>1566529509632</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1566529509632</updated>
<workItem from="1566529511079" duration="1984000" />
<workItem from="1566868724482" duration="1354000" />
<workItem from="1566899977552" duration="1870000" />
<workItem from="1567049988122" duration="587000" />
</task>
<servers />
</component>
<component name="TimeTrackingManager">
<option name="totallyTimeSpent" value="5795000" />
</component>
<component name="ToolWindowManager">
<frame x="528" y="0" width="1400" height="909" extended-state="7" />
<layout>
<window_info content_ui="combo" id="Project" order="0" visible="true" weight="0.18674698" />
<window_info id="Structure" order="1" side_tool="true" weight="0.25" />
<window_info id="Favorites" order="2" side_tool="true" />
<window_info anchor="bottom" id="Message" order="0" />
<window_info anchor="bottom" id="Find" order="1" />
<window_info active="true" anchor="bottom" id="Run" order="2" visible="true" weight="0.5121387" />
<window_info anchor="bottom" id="Debug" order="3" weight="0.4" />
<window_info anchor="bottom" id="Cvs" order="4" weight="0.25" />
<window_info anchor="bottom" id="Inspection" order="5" weight="0.4" />
<window_info anchor="bottom" id="TODO" order="6" />
<window_info anchor="bottom" id="Docker" order="7" show_stripe_button="false" />
<window_info anchor="bottom" id="Version Control" order="8" />
<window_info anchor="bottom" id="Database Changes" order="9" />
<window_info anchor="bottom" id="Event Log" order="10" side_tool="true" />
<window_info anchor="bottom" id="Terminal" order="11" weight="0.32947975" />
<window_info anchor="bottom" id="Python Console" order="12" />
<window_info anchor="right" id="Commander" internal_type="SLIDING" order="0" type="SLIDING" weight="0.4" />
<window_info anchor="right" id="Ant Build" order="1" weight="0.25" />
<window_info anchor="right" content_ui="combo" id="Hierarchy" order="2" weight="0.25" />
<window_info anchor="right" id="SciView" order="3" />
<window_info anchor="right" id="Database" order="4" />
</layout>
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="1" />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/test.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="10" selection-start-line="10" selection-end-line="10" />
</state>
</provider>
</entry>
</component>
</project>
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
\ No newline at end of file
import tornado.web
import tornado.ioloop
from aiohttp.client_exceptions import ServerDisconnectedError
from utils.data_base import dict_data
from utils.base_handler import TornadoBaseHandler
import re
from urllib.parse import quote
from lxml import etree
from traceback import format_exc
base_url = 'https://www.baidu.com/s?wd={}'
main_headers={
"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3",
"Accept-Encoding":"gzip, deflate, br",
"Accept-Language":"zh-CN,zh;q=0.9",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
# "Cookie":"sugstore=0; BIDUPSID=4B040DA2662547DD78020CA62CA89B58; PSTM=1568098190; BAIDUID=737706451117D0B879B98A3A0C702EF7:FG=1; BD_UPN=12314753; BDORZ=B490B5EBF6F3CD402E515D22BCDA1598; ispeed_lsm=2; H_PS_PSSID=1450_21117_18559_29523_29721_29568_29221_26350_29589; yjs_js_security_passport=3726eaca955446d4228d8ff4462a6c288d632334_1568968075_js; H_PS_645EC=6c8dZ2NCQwQuroL3u9NmQzWkTImtuyF368MKcho936G4EsqdQJQ4lrTBesE; delPer=0; BD_CK_SAM=1; PSINO=6; BDSVRTM=0",
"Host":"www.baidu.com",
"Pragma":"no-cache",
"Sec-Fetch-Mode":"navigate",
"Sec-Fetch-Site":"same-origin",
"Sec-Fetch-User":"?1",
"Upgrade-Insecure-Requests":"1",
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36",
}
detail_referer="https://www.baidu.com/s?ie=UTF-8&wd={}"
snap_time_pat=re.compile(r'以下是该网页在北京时间 (.*) 的快照;')
detail_headers={
"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3",
"Accept-Encoding":"gzip, deflate",
"Accept-Language":"zh-CN,zh;q=0.9",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
"Host":"cache.baiducontent.com",
"Pragma":"no-cache",
"Upgrade-Insecure-Requests":"1",
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36",
}
class SeoCheckHandler(TornadoBaseHandler):
async def get(self, *args, **kwargs):
try:
data = await self.get_data()
except ServerDisconnectedError:
data = {'error':1,
'msg':'Temporarily unavailable'}
self.logging.error(format_exc())
except:
data={'error':1,
'msg':format_exc()}
self.logging.error(format_exc())
self.write(data)
async def get_data(self):
target = quote(self.get_argument('target'))
self.logging.info(base_url.format(target))
async with self.proxy_request(base_url.format(target),headers=main_headers) as resp:
t = await resp.text()
dom=etree.HTML(t)
data = dict_data()
data['baidu_url'] = base_url.format(target)
if dom.xpath('//div[@id="content_left"]'):
data['has_included'] = 1
snap_shot_url = dom.xpath('//a[contains(text(),"百度快照")]/@href')[0]
detail_headers['Referer'] = detail_referer.format(target)
async with self.proxy_request(snap_shot_url, headers=main_headers,allow_redirects=True) as resp:
detail = await resp.text()
snap_time = snap_time_pat.findall(detail)
if snap_time:
data['included_time'] = snap_time[0]
else:
data['included_time'] = '未知'
else:
data['has_included'] = 0
return data
def gen_app():
return tornado.web.Application(handlers=[(k, v) for k, v in register_tornado_handlers.items()])
if __name__ == '__main__':
register_tornado_handlers = {'/seocheck': SeoCheckHandler}
app = gen_app()
app.listen(9421)
tornado.ioloop.IOLoop.current().start()
\ No newline at end of file
import re
from urllib.parse import quote
import requests
pat=re.compile(r'GET (.*)HTTP/1.1')
from traceback import print_exc,format_exc
with open(r'C:\Users\ICHUNT\Desktop\FlumeData.1569254401514') as f:
text = f.readlines()
for i in text:
a=pat.findall(i)
if a:
prefix= 'http://127.0.0.1:9421/seocheck?target='
domain = 'www.ichunt.com'
url = prefix+domain+quote(a[0])
try:
res = requests.get(url).json()
print(res)
except:
print(url)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from utils.log_manager import LogManager
test_log = LogManager('test').get_logger_and_add_handlers(log_path='./logs', log_filename='test.log')
test_log.debug('这一句不会重复打印四次和写入日志四次')
#!/usr/bin/env python
# -*- coding:utf-8 -*-
\ No newline at end of file
No preview for this file type
No preview for this file type
No preview for this file type
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.web
from utils.log_manager import LogManager
from utils.proxy_client_util import MyProxyContextManger, proxy_manger
from utils.connecter_util import session_manger
class TornadoBaseHandler(tornado.web.RequestHandler):
session = session_manger.get_session()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logging = LogManager('seocheck').get_logger_and_add_handlers(log_level_int=4,log_path='./logs', log_filename='seocheck.log',is_add_stream_handler=True)
def proxy_request(self, url, headers=None, params=None, session=None, method='GET', vendor=None,
keep_same_proxy=False, request_timeout=15, allow_redirects=True, data=None, log=False,
verify_ssl=None, switch=True, max_redirects=10, proxy=None, proxy_auth=None):
if switch:
if proxy is None:
proxy, proxy_auth = proxy_manger.get_next(vendor, keep_same_proxy, self)
else:
proxy = proxy_auth = None
if session is None:
session = self.session
return MyProxyContextManger(None,
session._request(method, url, headers=headers, params=params,
timeout=request_timeout,
allow_redirects=allow_redirects, data=data, proxy=proxy,
proxy_auth=proxy_auth, verify_ssl=verify_ssl,
max_redirects=max_redirects))
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aiohttp
aio_session_max_client = 1000
class SessionManger(object):
def __init__(self, concurrency_limit=aio_session_max_client, loop=None,
disable_cookie=False):
self.session = self._generate_session(concurrency_limit=concurrency_limit, loop=loop,
disable_cookie=disable_cookie)
@staticmethod
def _generate_connector(limit, loop=None):
"""
https://github.com/KeepSafe/aiohttp/issues/883
if connector is passed to session, it is not available anymore
"""
if not loop:
loop = asyncio.get_event_loop()
return aiohttp.TCPConnector(limit=limit, loop=loop, verify_ssl=False)
def _generate_session(self, concurrency_limit, loop=None, disable_cookie=False):
if not loop:
loop = asyncio.get_event_loop()
cookie_jar = aiohttp.DummyCookieJar() if disable_cookie is True else None
return aiohttp.ClientSession(connector=self._generate_connector(limit=concurrency_limit, loop=loop),
loop=loop, cookie_jar=cookie_jar)
def get_session(self):
return self.session
# async def reset_session(self):
# await self.session.close()
# self.session = self._generate_session(concurrency_limit=1000, loop=None)
# def __del__(self):
# self.session.close()
# def __aexit__(self, exc_type, exc_val, exc_tb):
# self.session.close()
session_manger = SessionManger()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
def dict_data():
data={
'has_included':0,
'included_time':None,
'baidu_url':None,
'error':0,
'msg':''
}
return data
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import base64
import datetime
import html
import json
import random
import re
import string
import time
from urllib.parse import quote, quote_plus
import redis
# from fake_useragent import UserAgent
from lxml.etree import HTML, XML, XMLSyntaxError, tostring
# ua = UserAgent()
_local_pool = redis.ConnectionPool(host="localhost", port=6379, db=0)
_local_redis = redis.StrictRedis(connection_pool=_local_pool)
utc_distance = (datetime.datetime.now() - datetime.datetime.utcnow()).seconds
UA_list = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",
"Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
"Opera/9.80 (Windows NT 5.1; U; zh-cn) Presto/2.9.168 Version/11.50",
"Mozilla/5.0 (Windows NT 5.1; rv:5.0) Gecko/20100101 Firefox/5.0",
"Mozilla/5.0 (Windows NT 5.2) AppleWebKit/534.30 (KHTML, like Gecko) Chrome/12.0.742.122 Safari/534.30",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; 360SE)",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 SE 2.X MetaSr 1.0",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0)",
"Opera/9.27 (Windows NT 5.2; U; zh-cn)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 OPR/26.0.1656.60",
"Opera/8.0 (Windows NT 5.1; U; en)",
"Mozilla/5.0 (Windows NT 5.1; U; en; rv:1.8.1) Gecko/20061208 Firefox/2.0.0 Opera 9.50",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:63.0) Gecko/20100101 Firefox/63.0"
)
def ncr2unicode(string):
return html.unescape(string)
class LazyProperty(object):
"""
LazyProperty
explain: http://www.spiderpy.cn/blog/5/
"""
def __init__(self, func):
self.func = func
def __get__(self, instance, owner):
if instance is None:
return self
else:
value = self.func(instance)
setattr(instance, self.func.__name__, value)
return value
def datetime2timestamp(dt):
"""
datetime2timestamp("2016-1-1 12:12:12") -> 1471234567
"""
if "T" in dt:
struct_time = time.strptime(dt, "%Y-%m-%dT%H:%M:%S")
return int(time.mktime(struct_time)) + utc_distance
else:
struct_time = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
return int(time.mktime(struct_time))
def timestamp2datetime(value):
"""
timestamp2datetime(1471234567) -> "2016-1-1 12:12:12"
"""
structure = time.localtime(value)
dt = time.strftime('%Y-%m-%d %H:%M:%S', structure)
return dt
def timestamp2utc_datetime(value):
structure = time.gmtime(value)
dt = time.strftime('%Y-%m-%dT%H:%M:%S', structure)
return dt
def formatter_date_utc(before, structure, is_utc=False):
if is_utc:
ts = time.mktime(time.strptime(before, structure))
else:
ts = time.mktime(time.strptime(before, structure)) - 28800
dt = datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S")
return dt
def url_encode_plus(s, safe='', encoding=None, errors=None):
return quote_plus(s, safe=safe, encoding=encoding, errors=errors)
def url_encode(s, safe='/', encoding=None, errors=None):
return quote(s, safe=safe, encoding=encoding, errors=errors)
def doc2header(doc):
header = {}
for line in doc.split('\n'):
k, v = line.split(': ')
header[k] = v
return header
def lxml_html(text, parser=None, base_url=None):
try:
return HTML(text, parser=parser, base_url=base_url)
except XMLSyntaxError:
return
def lxml_xml(text, parser=None, base_url=None):
return XML(text, parser=parser, base_url=base_url)
def etree_tostring(element_or_tree):
return tostring(element_or_tree)
def get_ts(k=10):
return "%d" % int(time.time() * (10 ** (k - 10)))
def get_ts_int(k=10):
return int(time.time() * (10 ** (k - 10)))
def check_json_format(raw_msg):
"""
用于判断一个字符串是否符合Json格式
"""
if isinstance(raw_msg, bytes) or isinstance(raw_msg, str): # 首先判断变量是否为字符串
try:
json.loads(raw_msg)
except Exception:
return False
return True
else:
return False
def b64decode(s):
return base64.b64decode(s).decode('utf8')
def b64encode(s):
return base64.b64encode(s.encode('utf8')).decode('utf8')
def random_ua():
return random.choice(UA_list)
# def random_ua_2():
# return ua.random
def gen_chars(size, charset=string.ascii_letters + string.digits, prefix="", postfix=""):
"""
从charset中产生长度为size的字符串
:param size: 字符串长度
:param charset: 字符集参数
:param prefix: 前缀
:param postfix: 后缀
:return:
"""
return prefix + "".join((random.choice(charset) for _ in range(size))) + postfix
def gen_sep_chars(alist, sep="-", charset=string.ascii_letters + string.digits, prefix="", postfix=""):
"""
从alist中获取各部分长度并产生字符串,并以sep为分隔符,将它们连接起来
:param alist: 长度列表
:param sep: 分隔符
:param charset: 字符集
:param prefix: 前缀
:param postfix: 后缀
:return:
"""
return prefix + sep.join((gen_chars(size, charset) for size in alist)) + postfix
def random_choice(s):
return random.choice(s)
def random_int(s, e):
return random.randint(s, e)
def random_float():
return random.random()
def get_token(key):
return _local_redis.get(key)
def rand_token(platform):
token_key = "tas_token_{platform}".format(platform=platform)
val_str = _local_redis.srandmember(token_key)
return json.loads(val_str)
def format_date(raw_date):
year = datetime.datetime.now().year
month = datetime.datetime.now().month
day = datetime.datetime.now().day
hour = datetime.datetime.now().hour
minute = datetime.datetime.now().minute
second = datetime.datetime.now().second
# b_day, b_hours, b_mins, b_secs = 0, 0, 0, 0
if '秒' in raw_date:
b_secs = int(re.search(r'\d+', raw_date).group(0))
time_tuple = (year, month, day, hour, minute, second - b_secs, 0, 0, 0)
ts = int(time.mktime(time_tuple))
dt = timestamp2datetime(ts)
elif '分' in raw_date:
b_mins = int(re.search(r'\d+', raw_date).group(0))
time_tuple = (year, month, day, hour, minute - b_mins, second, 0, 0, 0)
ts = int(time.mktime(time_tuple))
dt = timestamp2datetime(ts)
else:
dt = raw_date[:10] + " " + raw_date[10:]
ts = datetime2timestamp(dt)
return dt, ts
# coding=utf8
import sys
import os
from threading import Lock
import unittest
import time
from collections import OrderedDict
import logging
from logging import handlers
from concurrent_log_handler import ConcurrentRotatingFileHandler
os_name = os.name
formatter_dict = {
1: logging.Formatter(
'日志时间【%(asctime)s】 - 日志名称【%(name)s】 - 文件【%(filename)s】 - 第【%(lineno)d】行 - 日志等级【%(levelname)s】 - 日志信息【%(message)s】',
"%Y-%m-%d %H:%M:%S"),
2: logging.Formatter(
'%(asctime)s - %(name)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s',
"%Y-%m-%d %H:%M:%S"),
3: logging.Formatter(
'%(asctime)s - %(name)s - 【 File "%(pathname)s", line %(lineno)d, in %(funcName)s 】 - %(levelname)s - %(message)s',
"%Y-%m-%d %H:%M:%S"), # 一个模仿traceback异常的可跳转到打印日志地方的模板
4: logging.Formatter(
'%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s - File "%(pathname)s", line %(lineno)d ',
"%Y-%m-%d %H:%M:%S"), # 这个也支持日志跳转
5: logging.Formatter(
'%(asctime)s - %(name)s - "%(pathname)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(message)s',
"%Y-%m-%d %H:%M:%S"), # 我认为的最好的模板,推荐
6: logging.Formatter('%(name)s - %(asctime)-15s - %(filename)s - %(lineno)d - %(levelname)s: %(message)s',
"%Y-%m-%d %H:%M:%S"),
7: logging.Formatter('%(levelname)s - %(filename)s - %(lineno)d - %(message)s'), # 一个只显示简短文件名和所处行数的日志模板
}
# noinspection PyMissingOrEmptyDocstring
class LogLevelException(Exception):
def __init__(self, log_level):
err = '设置的日志级别是 {0}, 设置错误,请设置为1 2 3 4 5 范围的数字'.format(log_level)
Exception.__init__(self, err)
# noinspection PyMissingOrEmptyDocstring
class MongoHandler(logging.Handler):
"""
一个mongodb的log handler,支持日志按loggername创建不同的集合写入mongodb中
"""
# msg_pattern = re.compile('(\d+-\d+-\d+ \d+:\d+:\d+) - (\S*?) - (\S*?) - (\d+) - (\S*?) - ([\s\S]*)')
def __init__(self, mongo_url, mongo_database='logs'):
"""
:param mongo_url: mongo连接
:param mongo_database: 保存日志的数据库,默认使用logs数据库
"""
logging.Handler.__init__(self)
import pymongo
mongo_client = pymongo.MongoClient(mongo_url)
self.mongo_db = mongo_client.get_database(mongo_database)
def emit(self, record):
# noinspection PyBroadException,PyPep8
try:
"""以下使用解析日志模板的方式提取出字段"""
# msg = self.format(record)
# logging.LogRecord
# msg_match = self.msg_pattern.search(msg)
# log_info_dict = {'time': msg_match.group(1),
# 'name': msg_match.group(2),
# 'file_name': msg_match.group(3),
# 'line_no': msg_match.group(4),
# 'log_level': msg_match.group(5),
# 'detail_msg': msg_match.group(6),
# }
level_str = None
if record.levelno == 10:
level_str = 'DEBUG'
elif record.levelno == 20:
level_str = 'INFO'
elif record.levelno == 30:
level_str = 'WARNING'
elif record.levelno == 40:
level_str = 'ERROR'
elif record.levelno == 50:
level_str = 'CRITICAL'
log_info_dict = OrderedDict()
log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S')
log_info_dict['name'] = record.name
log_info_dict['file_path'] = record.pathname
log_info_dict['file_name'] = record.filename
log_info_dict['func_name'] = record.funcName
log_info_dict['line_no'] = record.lineno
log_info_dict['log_level'] = level_str
log_info_dict['detail_msg'] = record.msg
col = self.mongo_db.get_collection(record.name)
col.insert_one(log_info_dict)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)
class ColorHandler0(logging.Handler):
"""彩色日志handler,根据不同级别的日志显示不同颜色"""
bule = 96 if os_name == 'nt' else 36
yellow = 93 if os_name == 'nt' else 33
def __init__(self):
logging.Handler.__init__(self)
self.formatter_new = logging.Formatter(
'%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s',
"%Y-%m-%d %H:%M:%S")
# 对控制台日志单独优化显示和跳转,单独对字符串某一部分使用特殊颜色,主要用于第四种模板,以免filehandler和mongohandler中带有\033
@classmethod
def _my_align(cls, string, length):
if len(string) > length * 2:
return string
custom_length = 0
for w in string:
custom_length += 1 if cls._is_ascii_word(w) else 2
if custom_length < length:
place_length = length - custom_length
string += ' ' * place_length
return string
@staticmethod
def _is_ascii_word(w):
if ord(w) < 128:
return True
def emit(self, record):
"""
30 40 黑色
31 41 红色
32 42 绿色
33 43 黃色
34 44 蓝色
35 45 紫红色
36 46 青蓝色
37 47 白色
:type record:logging.LogRecord
:return:
"""
if self.formatter is formatter_dict[4] or self.formatter is self.formatter_new:
self.formatter = self.formatter_new
if os.name == 'nt':
self.__emit_for_fomatter4_pycahrm(record) # 使用模板4并使用pycharm时候
else:
self.__emit_for_fomatter4_linux(record) # 使用模板4并使用linux时候
else:
self.__emit(record) # 其他模板
def __emit_for_fomatter4_linux(self, record):
"""
当使用模板4针对linxu上的终端打印优化显示
:param record:
:return:
"""
# noinspection PyBroadException,PyPep8
try:
msg = self.format(record)
file_formatter = ' ' * 10 + '\033[7mFile "%s", line %d\033[0m' % (record.pathname, record.lineno)
if record.levelno == 10:
print('\033[0;32m%s' % self._my_align(msg, 150) + file_formatter)
elif record.levelno == 20:
print('\033[0;34m%s' % self._my_align(msg, 150) + file_formatter)
elif record.levelno == 30:
print('\033[0;33m%s' % self._my_align(msg, 150) + file_formatter)
elif record.levelno == 40:
print('\033[0;35m%s' % self._my_align(msg, 150) + file_formatter)
elif record.levelno == 50:
print('\033[0;31m%s' % self._my_align(msg, 150) + file_formatter)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)
def __emit_for_fomatter4_pycahrm(self, record):
"""
当使用模板4针对pycahrm的打印优化显示
:param record:
:return:
"""
# \033[0;93;107mFile "%(pathname)s", line %(lineno)d, in %(funcName)s\033[0m
# noinspection PyBroadException
try:
msg = self.format(record)
# for_linux_formatter = ' ' * 10 + '\033[7m;File "%s", line %d\033[0m' % (record.pathname, record.lineno)
file_formatter = ' ' * 10 + '\033[0;93;107mFile "%s", line %d\033[0m' % (record.pathname, record.lineno)
if record.levelno == 10:
print('\033[0;32m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 绿色
elif record.levelno == 20:
print('\033[0;36m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 青蓝色
elif record.levelno == 30:
print('\033[0;92m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 蓝色
elif record.levelno == 40:
print('\033[0;35m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 紫红色
elif record.levelno == 50:
print('\033[0;31m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 血红色
except (KeyboardInterrupt, SystemExit):
raise
except: # NOQA
self.handleError(record)
def __emit(self, record):
# noinspection PyBroadException
try:
msg = self.format(record)
if record.levelno == 10:
print('\033[0;32m%s\033[0m' % msg) # 绿色
elif record.levelno == 20:
print('\033[0;%sm%s\033[0m' % (self.bule, msg)) # 青蓝色 36 96
elif record.levelno == 30:
print('\033[0;%sm%s\033[0m' % (self.yellow, msg))
elif record.levelno == 40:
print('\033[0;35m%s\033[0m' % msg) # 紫红色
elif record.levelno == 50:
print('\033[0;31m%s\033[0m' % msg) # 血红色
except (KeyboardInterrupt, SystemExit):
raise
except: # NOQA
self.handleError(record)
class ColorHandler(logging.Handler):
"""
A handler class which writes logging records, appropriately formatted,
to a stream. Note that this class does not close the stream, as
sys.stdout or sys.stderr may be used.
"""
terminator = '\n'
bule = 96 if os_name == 'nt' else 36
yellow = 93 if os_name == 'nt' else 33
def __init__(self, stream=None):
"""
Initialize the handler.
If stream is not specified, sys.stderr is used.
"""
logging.Handler.__init__(self)
if stream is None:
stream = sys.stdout # stderr无彩。
self.stream = stream
def flush(self):
"""
Flushes the stream.
"""
self.acquire()
try:
if self.stream and hasattr(self.stream, "flush"):
self.stream.flush()
finally:
self.release()
def emit(self, record):
"""
Emit a record.
If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline. If
exception information is present, it is formatted using
traceback.print_exception and appended to the stream. If the stream
has an 'encoding' attribute, it is used to determine how to do the
output to the stream.
"""
# noinspection PyBroadException
try:
msg = self.format(record)
stream = self.stream
if record.levelno == 10:
msg_color = ('\033[0;32m%s\033[0m' % msg) # 绿色
elif record.levelno == 20:
msg_color = ('\033[0;%sm%s\033[0m' % (self.bule, msg)) # 青蓝色 36 96
elif record.levelno == 30:
msg_color = ('\033[0;%sm%s\033[0m' % (self.yellow, msg))
elif record.levelno == 40:
msg_color = ('\033[0;35m%s\033[0m' % msg) # 紫红色
elif record.levelno == 50:
msg_color = ('\033[0;31m%s\033[0m' % msg) # 血红色
else:
msg_color = msg
# print(msg_color,'***************')
stream.write(msg_color)
stream.write(self.terminator)
self.flush()
except Exception:
self.handleError(record)
def __repr__(self):
level = logging.getLevelName(self.level)
name = getattr(self.stream, 'name', '')
if name:
name += ' '
return '<%s %s(%s)>' % (self.__class__.__name__, name, level)
class CompatibleSMTPSSLHandler(handlers.SMTPHandler):
"""
官方的SMTPHandler不支持SMTP_SSL的邮箱,这个可以两个都支持,并且支持邮件发送频率限制
"""
def __init__(self, mailhost, fromaddr, toaddrs: tuple, subject,
credentials=None, secure=None, timeout=5.0, is_use_ssl=True, mail_time_interval=0):
"""
:param mailhost:
:param fromaddr:
:param toaddrs:
:param subject:
:param credentials:
:param secure:
:param timeout:
:param is_use_ssl:
:param mail_time_interval: 发邮件的时间间隔,可以控制日志邮件的发送频率,为0不进行频率限制控制,如果为60,代表1分钟内最多发送一次邮件
"""
# noinspection PyCompatibility
super().__init__(mailhost, fromaddr, toaddrs, subject,
credentials, secure, timeout)
self._is_use_ssl = is_use_ssl
self._current_time = 0
self._time_interval = mail_time_interval
self._msg_map = dict() # 是一个内容为键时间为值得映射
self._lock = Lock()
def emit0(self, record: logging.LogRecord):
"""
不用这个判断内容
"""
from threading import Thread
if sys.getsizeof(self._msg_map) > 10 * 1000 * 1000:
self._msg_map.clear()
if record.msg not in self._msg_map or time.time() - self._msg_map[record.msg] > self._time_interval:
self._msg_map[record.msg] = time.time()
# print('发送邮件成功')
Thread(target=self.__emit, args=(record,)).start()
else:
print(f'[log_manager.py] 邮件发送太频繁,此次不发送这个邮件内容: {record.msg} ')
def emit(self, record: logging.LogRecord):
"""
Emit a record.
Format the record and send it to the specified addressees.
"""
from threading import Thread
with self._lock:
if time.time() - self._current_time > self._time_interval:
self._current_time = time.time()
Thread(target=self.__emit, args=(record,)).start()
else:
print(f'[log_manager.py] 邮件发送太频繁,此次不发送这个邮件内容: {record.msg} ')
def __emit(self, record):
# noinspection PyBroadException
try:
import smtplib
from email.message import EmailMessage
import email.utils
t_start = time.time()
port = self.mailport
if not port:
port = smtplib.SMTP_PORT
smtp = smtplib.SMTP_SSL(self.mailhost, port, timeout=self.timeout) if self._is_use_ssl else smtplib.SMTP(
self.mailhost, port, timeout=self.timeout)
msg = EmailMessage()
msg['From'] = self.fromaddr
msg['To'] = ','.join(self.toaddrs)
msg['Subject'] = self.getSubject(record)
msg['Date'] = email.utils.localtime()
msg.set_content(self.format(record))
if self.username:
if self.secure is not None:
smtp.ehlo()
smtp.starttls(*self.secure)
smtp.ehlo()
smtp.login(self.username, self.password)
smtp.send_message(msg)
smtp.quit()
# noinspection PyPep8
print(
f'[log_manager.py] {time.strftime("%H:%M:%S",time.localtime())} 发送邮件给 {self.toaddrs} 成功,'
f'用时{round(time.time() - t_start,2)} ,发送的内容是--> {record.msg} \033[0;35m!!!请去邮箱检查,可能在垃圾邮件中\033[0m')
except Exception as e:
# self.handleError(record)
print(
f'[log_manager.py] {time.strftime("%H:%M:%S",time.localtime())} \033[0;31m !!!!!! 邮件发送失败,原因是: {e} \033[0m')
# noinspection PyTypeChecker
def get_logs_dir_by_folder_name(folder_name='/app/'):
"""获取app文件夹的路径,如得到这个路径
D:/coding/hotel_fares/app
如果没有app文件夹,就在当前文件夹新建
"""
three_parts_str_tuple = (os.path.dirname(__file__).replace('\\', '/').partition(folder_name))
# print(three_parts_str_tuple)
if three_parts_str_tuple[1]:
return three_parts_str_tuple[0] + three_parts_str_tuple[1] + 'logs/' # noqa
else:
return three_parts_str_tuple[0] + '/logs/' # NOQA
def get_logs_dir_by_disk_root():
"""
返回磁盘根路径下的pythonlogs文件夹,当使用文件日志时候自动创建这个文件夹。
:return:
"""
from pathlib import Path
return str(Path(Path(__file__).absolute().root) / Path('pythonlogs'))
# noinspection PyMissingOrEmptyDocstring,PyPep8
class LogManager(object):
"""
一个日志管理类,用于创建logger和添加handler,支持将日志打印到控制台打印和写入日志文件和mongodb和邮件。
"""
logger_name_list = []
logger_list = []
def __init__(self, logger_name=None):
"""
:param logger_name: 日志名称,当为None时候创建root命名空间的日志,一般不要传None,除非你确定需要这么做
"""
self._logger_name = logger_name
self.logger = logging.getLogger(logger_name)
self._logger_level = None
self._is_add_stream_handler = None
self._do_not_use_color_handler = None
self._log_path = None
self._log_filename = None
self._log_file_size = None
self._mongo_url = None
self._formatter = None
# 此处可以使用*args ,**kwargs减少很多参数,但为了pycharm更好的自动智能补全提示放弃这么做
@classmethod
def bulid_a_logger_with_mail_handler(cls, logger_name, log_level_int=10, *, is_add_stream_handler=True,
do_not_use_color_handler=False, log_path=get_logs_dir_by_disk_root(),
log_filename=None,
log_file_size=100, mongo_url=None,
formatter_template=5, mailhost: tuple = ('smtp.mxhichina.com', 465),
fromaddr: str = 'xxx@oo.com',
toaddrs: tuple = ('yy@oo.com', 460,'zzg@oo.com', 'xxxx@qq.com','ttttt@qq.com', 'hhh@qq.com', 'mmmm@oo.com'),
subject: str = '日志报警测试',
credentials: tuple = ('xxx@oo.com', '123456789'),
secure=None, timeout=5.0, is_use_ssl=True, mail_time_interval=60):
"""
创建一个附带邮件handler的日志
:param logger_name:
:param log_level_int: 可以用1 2 3 4 5 ,用可以用官方logging模块的正规的10 20 30 40 50,兼容。
:param is_add_stream_handler:
:param do_not_use_color_handler:
:param log_path:
:param log_filename:
:param log_file_size:
:param mongo_url:
:param formatter_template:
:param mailhost:
:param fromaddr:
:param toaddrs:
:param subject:
:param credentials:
:param secure:
:param timeout:
:param is_use_ssl:
:param mail_time_interval: 邮件的频率控制,为0不限制,如果为100,代表100秒内相同内容的邮件最多发送一次邮件
:return:
"""
if log_filename is None:
log_filename = f'{logger_name}.log'
logger = cls(logger_name).get_logger_and_add_handlers(log_level_int=log_level_int,
is_add_stream_handler=is_add_stream_handler,
do_not_use_color_handler=do_not_use_color_handler,
log_path=log_path, log_filename=log_filename,
log_file_size=log_file_size, mongo_url=mongo_url,
formatter_template=formatter_template, )
smtp_handler = CompatibleSMTPSSLHandler(mailhost, fromaddr,
toaddrs,
subject,
credentials,
secure,
timeout,
is_use_ssl,
mail_time_interval,
)
log_level_int = log_level_int * 10 if log_level_int < 10 else log_level_int
smtp_handler.setLevel(log_level_int)
smtp_handler.setFormatter(formatter_dict[formatter_template])
if not cls.__judge_logger_contain_handler_class(logger, CompatibleSMTPSSLHandler):
if logger.name == 'root':
for logger_x in cls.logger_list:
for hdlr in logger_x.handlers:
if isinstance(hdlr, CompatibleSMTPSSLHandler):
logger_x.removeHandler(hdlr)
logger.addHandler(smtp_handler)
return logger
# 加*是为了强制在调用此方法时候使用关键字传参,如果以位置传参强制报错,因为此方法后面的参数中间可能以后随时会增加更多参数,造成之前的使用位置传参的代码参数意义不匹配。
def get_logger_and_add_handlers(self, log_level_int: int = 10, *, is_add_stream_handler=True,
do_not_use_color_handler=False, log_path=get_logs_dir_by_disk_root(),
log_filename=None, log_file_size=100,
mongo_url=None,
formatter_template=5):
"""
:param log_level_int: 日志输出级别,设置为 1 2 3 4 5,分别对应原生logging.DEBUG(10),logging.INFO(20),logging.WARNING(30),logging.ERROR(40),logging.CRITICAL(50)级别,现在可以直接用10 20 30 40 50了,兼容了。
:param is_add_stream_handler: 是否打印日志到控制台
:param do_not_use_color_handler :是否禁止使用color彩色日志
:param log_path: 设置存放日志的文件夹路径
:param log_filename: 日志的名字,仅当log_path和log_filename都不为None时候才写入到日志文件。
:param log_file_size :日志大小,单位M,默认10M
:param mongo_url : mongodb的连接,为None时候不添加mongohandler
:param formatter_template :日志模板,1为formatter_dict的详细模板,2为简要模板,5为最好模板
:type log_level_int :int
:type is_add_stream_handler :bool
:type log_path :str
:type log_filename :str
:type mongo_url :str
:type log_file_size :int
"""
self._logger_level = log_level_int * 10 if log_level_int < 10 else log_level_int
self._is_add_stream_handler = is_add_stream_handler
self._do_not_use_color_handler = do_not_use_color_handler
self._log_path = log_path
self._log_filename = log_filename
self._log_file_size = log_file_size
self._mongo_url = mongo_url
self._formatter = formatter_dict[formatter_template]
self.__set_logger_level()
self.__add_handlers()
self.logger_name_list.append(self._logger_name)
self.logger_list.append(self.logger)
return self.logger
def get_logger_without_handlers(self):
"""返回一个不带hanlers的logger"""
return self.logger
# noinspection PyMethodMayBeStatic,PyMissingOrEmptyDocstring
def look_over_all_handlers(self):
print(f'{self._logger_name}名字的日志的所有handlers是--> {self.logger.handlers}')
def remove_all_handlers(self):
for hd in self.logger.handlers:
self.logger.removeHandler(hd)
def remove_handler_by_handler_class(self, handler_class: type):
"""
去掉指定类型的handler
:param handler_class:logging.StreamHandler,ColorHandler,MongoHandler,ConcurrentRotatingFileHandler,MongoHandler,CompatibleSMTPSSLHandler的一种
:return:
"""
if handler_class not in (logging.StreamHandler, ColorHandler, MongoHandler, ConcurrentRotatingFileHandler, MongoHandler, CompatibleSMTPSSLHandler):
raise TypeError('设置的handler类型不正确')
for handler in self.logger.handlers:
if isinstance(handler, handler_class):
self.logger.removeHandler(handler)
def __set_logger_level(self):
self.logger.setLevel(self._logger_level)
def __remove_handlers_from_other_logger_when_logger_name_is_none(self, handler_class):
"""
当logger name为None时候需要移出其他logger的handler,否则重复记录日志
:param handler_class: handler类型
:return:
"""
if self._logger_name is None:
for logger in self.logger_list:
for hdlr in logger.handlers:
if isinstance(hdlr, handler_class):
logger.removeHandler(hdlr)
@staticmethod
def __judge_logger_contain_handler_class(logger: logging.Logger, handler_class):
for h in logger.handlers + logging.getLogger().handlers:
if isinstance(h, (handler_class,)):
return True
def __add_handlers(self):
if self._is_add_stream_handler:
if not self.__judge_logger_contain_handler_class(self.logger,
ColorHandler): # 主要是阻止给logger反复添加同种类型的handler造成重复记录
self.__remove_handlers_from_other_logger_when_logger_name_is_none(ColorHandler)
self.__add_stream_handler()
if all([self._log_path, self._log_filename]):
if not self.__judge_logger_contain_handler_class(self.logger, ConcurrentRotatingFileHandler):
self.__remove_handlers_from_other_logger_when_logger_name_is_none(ConcurrentRotatingFileHandler)
self.__add_file_handler()
if self._mongo_url:
if not self.__judge_logger_contain_handler_class(self.logger, MongoHandler):
self.__remove_handlers_from_other_logger_when_logger_name_is_none(MongoHandler)
self.__add_mongo_handler()
def __add_mongo_handler(self):
"""写入日志到mongodb"""
mongo_handler = MongoHandler(self._mongo_url)
mongo_handler.setLevel(self._logger_level)
mongo_handler.setFormatter(self._formatter)
self.logger.addHandler(mongo_handler)
def __add_stream_handler(self):
"""
日志显示到控制台
"""
# stream_handler = logging.StreamHandler()
stream_handler = ColorHandler() if not self._do_not_use_color_handler else logging.StreamHandler() # 不使用streamhandler,使用自定义的彩色日志
stream_handler.setLevel(self._logger_level)
stream_handler.setFormatter(self._formatter)
self.logger.addHandler(stream_handler)
def __add_file_handler(self):
"""
日志写入日志文件
"""
if not os.path.exists(self._log_path):
os.makedirs(self._log_path)
log_file = os.path.join(self._log_path, self._log_filename)
rotate_file_handler = None
if os_name == 'nt':
# windows下用这个,非进程安全
rotate_file_handler = ConcurrentRotatingFileHandler(log_file, maxBytes=self._log_file_size * 1024 * 1024,
backupCount=3,
encoding="utf-8")
if os_name == 'posix':
# linux下可以使用ConcurrentRotatingFileHandler,进程安全的日志方式
rotate_file_handler = ConcurrentRotatingFileHandler(log_file, maxBytes=self._log_file_size * 1024 * 1024,
backupCount=3, encoding="utf-8")
rotate_file_handler.setLevel(self._logger_level)
rotate_file_handler.setFormatter(self._formatter)
self.logger.addHandler(rotate_file_handler)
class LoggerMixin(object):
subclass_logger_dict = {}
@property
def logger(self):
if self.__class__.__name__ + '1' not in self.subclass_logger_dict:
logger_var = LogManager(self.__class__.__name__).get_logger_and_add_handlers()
self.subclass_logger_dict[self.__class__.__name__ + '1'] = logger_var
return logger_var
else:
return self.subclass_logger_dict[self.__class__.__name__ + '1']
@property
def logger_with_file(self):
if self.__class__.__name__ + '2' not in self.subclass_logger_dict:
logger_var = LogManager(type(self).__name__).get_logger_and_add_handlers(log_filename=type(self).__name__ + '.log', log_file_size=50)
self.subclass_logger_dict[self.__class__.__name__ + '2'] = logger_var
return logger_var
else:
return self.subclass_logger_dict[self.__class__.__name__ + '2']
simple_logger = LogManager('simple').get_logger_and_add_handlers()
defaul_logger = LogManager('hotel').get_logger_and_add_handlers(do_not_use_color_handler=True, formatter_template=7)
file_logger = LogManager('hotelf').get_logger_and_add_handlers(do_not_use_color_handler=True,
log_filename='hotel_' + time.strftime("%Y-%m-%d",
time.localtime()) + ".log",
formatter_template=7)
# noinspection PyMethodMayBeStatic,PyNestedDecorators,PyArgumentEqualDefault
class _Test(unittest.TestCase):
# noinspection PyMissingOrEmptyDocstring
@classmethod
def tearDownClass(cls):
"""
"""
time.sleep(1)
# @unittest.skip
def test_repeat_add_handlers_(self):
"""测试重复添加handlers"""
LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log')
LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log')
LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log')
test_log = LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log')
print('下面这一句不会重复打印四次和写入日志四次')
time.sleep(1)
test_log.debug('这一句不会重复打印四次和写入日志四次')
@unittest.skip
def test_get_logger_without_hanlders(self):
"""测试没有handlers的日志"""
log = LogManager('test2').get_logger_without_handlers()
print('下面这一句不会被打印')
time.sleep(1)
log.info('这一句不会被打印')
@unittest.skip
def test_add_handlers(self):
"""这样可以在具体的地方任意写debug和info级别日志,只需要在总闸处规定级别就能过滤,很方便"""
LogManager('test3').get_logger_and_add_handlers(2)
log1 = LogManager('test3').get_logger_without_handlers()
print('下面这一句是info级别,可以被打印出来')
time.sleep(1)
log1.info('这一句是info级别,可以被打印出来')
print('下面这一句是debug级别,不能被打印出来')
time.sleep(1)
log1.debug('这一句是debug级别,不能被打印出来')
@unittest.skip
def test_only_write_log_to_file(self): # NOQA
"""只写入日志文件"""
log5 = LogManager('test5').get_logger_and_add_handlers(20)
log6 = LogManager('test6').get_logger_and_add_handlers(is_add_stream_handler=False, log_filename='test6.log')
print('下面这句话只写入文件')
log5.debug('这句话只写入文件')
log6.debug('这句话只写入文件')
@unittest.skip
def test_get_app_logs_dir(self): # NOQA
print(get_logs_dir_by_folder_name())
print(get_logs_dir_by_disk_root())
@unittest.skip
def test_none(self):
# noinspection PyUnusedLocal
log1 = LogManager('log1').get_logger_and_add_handlers()
LogManager().get_logger_and_add_handlers()
LogManager().get_logger_and_add_handlers()
log1 = LogManager('log1').get_logger_and_add_handlers()
LogManager().get_logger_and_add_handlers()
LogManager('log1').get_logger_and_add_handlers(log_filename='test_none.log')
log1.debug('打印几次?')
@unittest.skip
def test_formater(self):
logger2 = LogManager('test_formater2').get_logger_and_add_handlers(formatter_template=6)
logger2.debug('测试日志模板2')
logger5 = LogManager('test_formater5').get_logger_and_add_handlers(formatter_template=5)
logger5.error('测试日志模板5')
defaul_logger.debug('dddddd')
file_logger.info('ffffff')
@unittest.skip
def test_bulid_a_logger_with_mail_handler(self):
"""
测试日志发送到邮箱中
:return:
"""
logger = LogManager.bulid_a_logger_with_mail_handler('mail_logger_name', mail_time_interval=10, toaddrs=(
'yyy@qq.com', 'zz@oo.com', ))
# LogManager.bulid_a_logger_with_mail_handler('mail_logger_name', mail_time_interval=10)
# LogManager.bulid_a_logger_with_mail_handler(None, log_filename='mail.log', mail_time_interval=10)
for _ in range(100):
logger.warning('啦啦啦啦啦')
logger.warning('测试邮件日志的内容。。。。')
time.sleep(2)
@unittest.skip
def test_remove_handler(self):
logger = LogManager('test13').get_logger_and_add_handlers()
logger.debug('去掉coloerhandler前')
LogManager('test13').remove_handler_by_handler_class(ColorHandler)
logger.debug('去掉coloerhandler后,此记录不会被打印')
if __name__ == "__main__":
unittest.main()
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import random
import aiohttp
from aiohttp.client import _RequestContextManager
class Vendor:
dobel_t3 = "dobel_t3"
dobel_passwd = 'oK97IIl298'
dobel_host_t3 = "http://http-proxy-t3.dobel.cn:9180"
max_dobel = 2
min_dobel = 1
dobel_t1 = "dobel_t1"
dobel_host_t1 = "http://http-proxy-t1.dobel.cn:9180"
t1_auth_name = 'LIEXINHTT3'
class DobelProxy:
@staticmethod
def get_proxy_t3():
i = random.randint(Vendor.min_dobel, Vendor.max_dobel)
while True:
while i <= Vendor.max_dobel:
auth_name = "LIEXINHTT%d" % (i,)
yield Vendor.dobel_host_t3, aiohttp.BasicAuth(auth_name, Vendor.dobel_passwd)
i += 1
i = Vendor.min_dobel
@staticmethod
def get_proxy_t1():
while True:
yield Vendor.dobel_host_t1, aiohttp.BasicAuth(Vendor.t1_auth_name, Vendor.dobel_passwd)
class AbuyunProxy:
@staticmethod
def get_proxy_pro():
while 1:
yield 'http://http-pro.abuyun.com:9010', aiohttp.BasicAuth('HI6MSKA0807328TP', '68D3A39E47889817')
class ProxyManger(object):
def __init__(self):
super().__init__()
self.vendor_generator_map = {
"dobel_t3": self.dobel_t3, # 套餐3 200并发 随机ip 每个请求随机分配IP
"dobel_t1": self.dobel_t1, # 套餐1 10并发 IP持续使用时间1~2分钟 支持自主切换IP
"abuyun_pro": self.aabuyun_pro # abuyun pro test
}
@property
def dobel_t3(self):
return DobelProxy.get_proxy_t3()
@property
def dobel_t1(self):
return DobelProxy.get_proxy_t1()
@property
def aabuyun_pro(self):
return AbuyunProxy.get_proxy_pro()
def get_next(self, vendor, keep_same_proxy, tornado_instance):
if keep_same_proxy and hasattr(tornado_instance, "vendor_previous_map"):
return list(tornado_instance.vendor_previous_map.values())[0]
if vendor is None:
vendor = self.get_vendor()
elif vendor not in self.vendor_generator_map:
raise KeyError("Unknown vendor: %s" % (str(vendor),))
values = next(self.vendor_generator_map[vendor])
tornado_instance.vendor_previous_map = {vendor: values}
return values
@staticmethod
def get_vendor():
return Vendor.dobel_t3
class MyProxyContextManger(_RequestContextManager):
def __init__(self, logging, coro):
super().__init__(coro)
self.logging = logging
async def __aenter__(self):
# begin_time = time.time()
self._resp = await self._coro
# end_time = time.time()
# self.logging.error("proxy request cost: %.3f seconds" % (end_time - begin_time,), StatusCode.ProxyRequest)
return self._resp
proxy_manger = ProxyManger()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment