Commit 7cacf5cb by lichenggang

Initial commit

parents
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
logs/*
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
.idea/
# celery beat schedule file
celerybeat-schedule
workspace.xml
# SageMath parsed files
*.sage.py
.idea/workspace.xml
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
import traceback
from queue import Queue, Empty
from threading import Thread
__all__ = ["fetcher"]
class Downloader(Thread):
def __init__(self, caller, task_queue, token_list, _):
Thread.__init__(self)
self.daemon = True
self.caller = caller
self.task_queue = task_queue
self.token_list = token_list
self.thread_mum = _
def run(self):
while 1:
try:
account_info = self.task_queue.get_nowait()
atoken = self.caller(*account_info, num=self.thread_mum)
if isinstance(atoken, list):
self.token_list.extend(atoken)
elif atoken:
self.token_list.append(atoken)
except Empty:
break
except:
logging.error(traceback.format_exc())
def fetcher(caller, account_list, thread_num):
task_queue = Queue()
for account in account_list:
task_queue.put(account)
threads = []
token_list = []
for _ in range(thread_num):
t = Downloader(caller, task_queue, token_list, _)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
return token_list
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
import os
from logging.handlers import RotatingFileHandler
loggerLevel = logging.INFO
root_path = os.getcwd().split('tas_token_server')[0] + 'tas_token_server'
log_dir = root_path + '/logs'
# log_dir = "logs"
console_formatter = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s'
json_formatter = '%(message)s'
logger_dict = dict()
def create_logger(log_name, log_type):
g_logger = logging.getLogger(log_name)
# rt = log_name.split('_')[0]
log_path = "%s" % log_dir
if not os.path.exists(log_path):
os.makedirs(log_path)
logfile = log_name + ".log"
log_file = "%s/%s" % (log_path, logfile)
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(console_formatter))
handler = RotatingFileHandler(log_file, maxBytes=2 * 1204 * 1024, backupCount=1)
fmt = json_formatter if log_type == 'json' else console_formatter
handler.setFormatter(logging.Formatter(fmt))
g_logger.addHandler(console)
g_logger.addHandler(handler)
g_logger.setLevel(loggerLevel)
return g_logger
def get_logger(log_name, log_type='file'):
if log_name not in logger_dict:
create_logger(log_name, log_type)
logger_dict[log_name] = logging.getLogger(log_name)
return logging.getLogger(log_name)
#!/usr/bin/env bash
ps -ef | grep tokenserver.py | awk '{print $2}' | xargs kill -9
nohup python3 -u token_server.py > /dev/null &
echo 'done'
\ No newline at end of file
#!/usr/bin/env bash
nohup python3 -u token_server.py > /dev/null &
echo "start token server.."
\ No newline at end of file
#!/usr/bin/env bash
ps -ef | grep token_server.py | awk '{print $2}' | xargs kill -9
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import csv
import json
import sys
# import logging
import traceback
from hashlib import md5
import redis
from fetcher import fetcher
ip = '39.108.51.147' if sys.platform in ('darwin', 'win32') else 'localhost'
_pool = redis.ConnectionPool(host="39.108.51.147", port=6379, db=0, password="icDb29mLy2s")
_redis = redis.Redis(connection_pool=_pool)
class BaseTokenCrawler(object):
def __init__(self, platform, level, main_num, logger, back_num=-1, mode="redis", thread_num=5):
self.platform = platform # 平台名称,xueqiu
self.level = level # H:1小时,D:1天,W:1周,M:1月
self.main_num = main_num # 主帐号个数,取帐号文件前main_num个,默认(-1)全部帐号为主账号
self.back_num = back_num # 备用帐号个数,取帐号文件后back_num个,默认(-1)不使用备用帐号
self.token_key = "tas_token_{platform}".format(platform=platform) # tokens在redis中的键,20004_token_xueqiu
self.file_path = "res/data_{}.csv".format(platform) # 帐号文件的路径,res/data_xueqiu.csv
self.mode = mode # 存储方式,oss或redis(默认)
self.thread_num = thread_num # 采集token的并发数
self.logger = logger
def collect_token(self, *args):
"""
**需要在子类中重写该函数**
**csv文件中帐号信息的保存顺序需要和collect_token函数参数顺序保持一致**
:param args: 帐号信息
:return: token
"""
raise NotImplementedError
def get_params_list(self):
"""
如果希望动态获取参数列表,可以重写该方法,默认从/res/data_<platform>.csv中读取数据
:return:
"""
with open(self.file_path, "r") as f:
reader = csv.reader(f)
params_list = list(reader)
return params_list
def collect_token_list(self, is_main):
"""
从帐号文件读取帐号信息,采集tokens
:param is_main: 是否为主账号
:return: tokens列表
"""
params_list = self.get_params_list()
if is_main: # 采集主账号
if self.main_num < 0:
account_list = params_list
else:
account_list = params_list[:self.main_num]
else: # 采集备用帐号
account_list = params_list[-self.back_num:]
token_list = fetcher(self.collect_token, account_list, self.thread_num)
# total = len(account_list)
# success = len(token_list)
# fail = total - success
# percent = float(success) / total * 100
# if percent < 80:
# mail.send_email(UID, self.platform, success, fail, percent)
return token_list
def store_token(self, tokens):
"""
将采集好的token序列化并保存到redis中
:param tokens: tokens列表
:return:
"""
token_str = json.dumps(tokens, separators=(",", ":"))
try:
if self.mode == "redis":
_redis.set(self.token_key, token_str)
md5_val = md5(token_str.encode()).hexdigest()
name = "token_md5"
key = "{}:{}".format(self.mode, self.token_key)
_redis.hset(name, key, md5_val)
except Exception:
self.logger.error(traceback.format_exc())
def process(self, is_main):
"""
采集tokens列表,将tokens保存到redis
:param is_main: 是否为主账号
:return:
"""
tokens = self.collect_token_list(is_main)
if tokens:
self.logger.warning("%s get tokens, length: %s", self.platform, len(tokens))
self.store_token(tokens)
else:
self.logger.warning("%s get tokens fail", self.platform)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import importlib
import inspect
import os
import pkgutil
import time
from queue import PriorityQueue
from threading import Thread
import lib
import token_lib
from log_manage import get_logger
process_list = ()
logger = get_logger('app')
LEVEL_MAP = {
"A": 10, # 仅供测试用
"B": 30, # 仅供测试用
"X": 60,
"C": 60 * 5,
"HH": 60 * 30,
"H": 60 * 60,
"4H": 60 * 60 * 4,
"HD": 60 * 60 * 12,
"D": 60 * 60 * 24,
"W": 60 * 60 * 24 * 7,
"M": 60 * 60 * 24 * 31,
"Z": float("inf")
}
THREAD_NUM = 1
DEFAULT_INTERVAL = "D"
BACK_INTERVAL = "X"
def init_task(queue):
"""
初始化任务队列
:param queue:
:return:
"""
pypath = os.path.dirname(lib.__file__)
for module_obj in pkgutil.iter_modules([pypath]):
module_path = "lib.%s" % module_obj[1]
module = importlib.import_module(module_path)
cls_members = inspect.getmembers(module, inspect.isclass)
for cls in cls_members:
if issubclass(cls[1], token_lib.BaseTokenCrawler):
platform = getattr(module, "PLATFORM")
if platform in process_list:
continue
level = getattr(module, "LEVEL", DEFAULT_INTERVAL)
main_num = getattr(module, "MAIN_NUM", -1)
back_num = getattr(module, "BACK_NUM", -1)
mode = getattr(module, "MODE", "redis")
thread_num = getattr(module, "THREAD_NUM", 5)
obj = cls[1](platform, level, main_num, logger, back_num, mode, thread_num)
cur_stamp = time.time()
queue.put((cur_stamp, obj, True))
logger.info("Adding platform:%s, level=%s, main_num=%s, back_num=%s, mode=%s, thread_num=%s",
platform, level, main_num, back_num, mode, thread_num)
class Worker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.daemon = True
self.queue = queue
def run(self):
while True:
timestamp, task, is_main = self.queue.get()
curstamp = time.time()
if curstamp < timestamp: # 没到更新的时间,把任务放回队列,睡眠一定时间
self.queue.put((timestamp, task, is_main))
time.sleep(timestamp - curstamp)
continue
task.process(is_main)
# print "%s updating: %s, %s, %s, %s" % (self.name, task.platform, timestamp, is_main, task.back_num)
if is_main and task.level == "Z": # 只更新一次
continue
elif is_main and task.back_num > 0: # 此次更新主账号,下次更新备用帐号
newstamp = time.time() + LEVEL_MAP[task.level]
is_main = False
elif is_main and task.back_num <= 0: # 此次更新主账号,下次更新主账号,无需备用帐号
newstamp = time.time() + LEVEL_MAP[task.level]
else: # 此次更新备用账号,下次更新主账号
newstamp = time.time() + LEVEL_MAP[BACK_INTERVAL]
is_main = True
self.queue.put((newstamp, task, is_main))
def main():
queue = PriorityQueue()
init_task(queue)
threads = []
for _ in range(THREAD_NUM):
t = Worker(queue)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment