Commit 4354e583 by lichenggang

Initial commit

parents
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import hashlib
import os
import time
import traceback
from datetime import datetime, timedelta
import psutil
from elasticsearch import Elasticsearch
from utils.log_manage import get_logger
from utils.mysql import MySqlOperator
from utils.robots import dd_send_msg
from utils.tas_redis import task_redis
logger = get_logger('crawler_process')
dashboard = MySqlOperator('dashboard')
ichunt_elk = Elasticsearch(host="127.0.0.1", port=9200, timeout=20)
def get_process_task(task_code):
keys = task_redis.keys(task_code + '*')
keys_len = sum([task_redis.llen(k) for k in keys])
abnormal_keys = task_redis.keys('abnormal_' + task_code + '*')
abnormal_keys_len = sum([task_redis.llen(k) for k in abnormal_keys])
return keys_len, abnormal_keys_len
def get_error_ratio(task_code, time_range=30):
utc_since = datetime.utcnow() - timedelta(minutes=time_range)
index = 'logstash-' + utc_since.strftime('%Y.%m.%d')
query = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"taskCode": task_code
}
},
{
"range": {
"@timestamp": {
"gte": utc_since.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
}
]
}
},
"aggs": {
"avgFailRate": {
"avg": {
"field": "failRate"
}
}
}
}
response = ichunt_elk.search(index=index, doc_type='doc', body=query)
value = response['aggregations']['avgFailRate']['value']
value = value if value is not None else 0.0
return round(value, 3)
def get_crawler_process_info():
flags = task_redis.hgetall('task_running_flag')
pids = os.popen("ps -ef | grep async_tas_caller.py | grep -v grep | awk '{print $2}'")
process = dict()
for pid in pids:
p = psutil.Process(int(pid))
info = p.as_dict(attrs=['cmdline', 'create_time', 'cpu_times'])
info['cmdline'] = {k: v for k, v in zip(info['cmdline'][2::2], info['cmdline'][3::2])}
info_dict = dict()
info_dict['pid'] = pid.strip()
info_dict['task_code'] = info['cmdline']['-r']
info_dict['platform'] = info['cmdline']['-mq'].split("_")[0]
info_dict['concurrency'] = int(info['cmdline']['-c'])
info_dict['queue'] = info['cmdline']['-mq'] if '-mq' in info['cmdline'] else ''
unic = ''.join([str(info['create_time']), info_dict['task_code'], str(info_dict['concurrency'])]).encode()
info_dict['unique_code'] = hashlib.md5(unic).hexdigest()
info_dict['status'] = 1 if info_dict['task_code'].encode() in flags else 0
info_dict['task_type'] = info['cmdline']['-ef'] if '-ef' in info['cmdline'] else ''
info_dict['start_time'] = int(info['create_time'])
info_dict['run_time'] = round(sum(info['cpu_times']), 3)
info_dict['remain_task'], info_dict['remain_wrong_task'] = get_process_task(info_dict['task_code'])
info_dict['wrong_radio'] = get_error_ratio(info_dict['task_code'])
process[info_dict['unique_code']] = info_dict
return process
def info_update(process_info):
# 第一步 获取进程信息 插入或更新mysql,更新redis暂停flag或终止进程
for k, v in process_info.items():
status = dashboard.check_exist_by_unicode(v['unique_code']) # 该任务状态
if status:
dashboard.update_process(v) # update
if status[0] == 0 and v['status'] == 1: # 运行
logger.info('%s:back to running', v['task_code'])
task_redis.hdel('task_running_flag', v['task_code'])
elif status[0] == 1 and v['status'] == 0: # 暂停
logger.info('%s: pause the crawler', v['task_code'])
task_redis.hset('task_running_flag', v['task_code'], '1')
elif status[0] == 2: # kill
logger.info('%s: kill the process', v['task_code'])
task_redis.hdel('task_running_flag', v['task_code'])
os.system('kill -9 %s' % v['pid'])
else:
dashboard.insert_process(v) # insert
# 第二步 检查mysql是否存在僵尸进程记录给予删除
# 1.进程不存在 mysql 记录存在 2.状态同步以mysql为标准
ps = dashboard.get_process_alive() # 获取mysql存活的记录
for p in ps:
if p[1] not in process_info:
dashboard.update_status(2, p[1])
logger.info('%s: update status', p[0])
def test():
flags = task_redis.hgetall('task_running_flag')
ps = dashboard.get_process_alive() # 获取mysql存活的记录
def main():
while 1:
try:
pi = get_crawler_process_info()
info_update(pi)
except Exception:
logger.error(traceback.format_exc())
dd_send_msg(traceback.format_exc())
finally:
time.sleep(60)
if __name__ == "__main__":
main()
File mode changed
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
import traceback
from utils.es_api import get_task_error_rate
from utils.log_manage import get_logger
from utils.mongo import MongoOperator
from utils.robots import dd_send_msg
from utils.tas_redis import get_flag
TIME_INTERVAL = 90 # minis
logger = get_logger('task_monitor')
mongo = MongoOperator()
def task_fail_rate_monitor():
while True:
try:
msg = get_task_error_rate(time_range=TIME_INTERVAL)
if msg:
logger.info(msg)
dd_send_msg(msg)
time.sleep(TIME_INTERVAL * 60)
except Exception:
logger.error(traceback.format_exc())
dd_send_msg(traceback.format_exc())
time.sleep(60 * 5)
def mongo_data_expired_rate_monitor():
while True:
try:
msg = mongo.get_platform_data_expired_rate()
if msg:
logger.info(msg)
dd_send_msg(msg)
time.sleep(3 * 60 * 60)
except Exception:
logger.error(traceback.format_exc())
dd_send_msg(traceback.format_exc())
time.sleep(60 * 5)
def task_running_flag_monitor():
while True:
try:
msg = get_flag()
if msg:
logger.info(msg)
dd_send_msg(msg)
time.sleep(3 * 60 * 60)
except Exception:
logger.error(traceback.format_exc())
dd_send_msg(traceback.format_exc())
time.sleep(60 * 5)
def main():
thread_monitor1 = threading.Thread(target=task_fail_rate_monitor)
thread_monitor1.start()
thread_monitor2 = threading.Thread(target=mongo_data_expired_rate_monitor)
thread_monitor2.start()
thread_monitor3 = threading.Thread(target=task_running_flag_monitor)
thread_monitor3.start()
if __name__ == "__main__":
main()
[program:af_schedule]
command = airflow scheduler
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:af_server]
command = /usr/bin/airflow webserver
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = liexin521
redirect_stderr = true
[program:cp]
directory = /data/cage/tas_monitor
command = python3 ./crawler_process.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:data_manager]
directory = /data/cage/data_manager
command = python3 ./server.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:keywords]
directory = /data/cage/tas_caller
command = python3 ./db2rds/keywords.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
\ No newline at end of file
[program:master_ip]
directory = /data/cage/tas_caller
command = python3 ./script/update_master_ip.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:monitor]
directory = /data/cage/tas_monitor
command = python3 ./monitor.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
\ No newline at end of file
[program:redis]
command = redis-server /home/liexin521/redis.conf
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:tas]
directory = /data/cage/tornado_api_server
command = python3 ./server.py --port=32345 --region=master
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
[program:token_server]
directory = /data/cage/tas_token_server
command = python3 ./token_server.py
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
startretries = 2
user = root
redirect_stderr = true
#!/usr/bin/env python
# -*- coding:utf-8 -*-
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
ENVIRONMENT = 'test' if sys.platform in ('darwin', 'win32') else 'produce'
def get_mysql_conf(db):
if ENVIRONMENT == 'produce':
user, psd = 'acuser', 'acuserXty2201'
host = '172.18.137.35'
else:
user, psd = db, db + '#zsyM'
host = '192.168.2.232'
conf = {
'host': host,
'port': 3306,
'user': user,
'password': psd,
'db': db,
'charset': 'utf8'
}
return conf
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
ichunt_elk = Elasticsearch(host="127.0.0.1", port=9200, timeout=20)
task_mapping = {
'arg': 'arrow sku更新',
'arc': 'arrow新品采集',
'ang': 'avnet sku更新',
'anc': 'avnet新品采集',
'alg': 'allied sku更新',
'alc': 'allied新品采集',
'rog': 'rochester sku更新',
'vrg': 'verical sku更新',
'rsg': 'rs sku更新',
'mug': 'mouser sku更新',
'dggw': 'digikey sku更新',
'dgc': 'digikey新品采集',
'r24g': 'rutronik sku更新',
'r24c': 'rutronik新品采集',
'skw': '搜索关键词sku更新',
'pc': '平台比价',
'spu': 'spu税率采集',
'bkc': 'buerklin新品采集',
'bkg': 'buerklin sku更新',
'bkgn': 'buerklin sku更新',
'msg': 'master sku更新',
'csg': 'chip1stop sku更新',
'elgw': 'element14 sku更新',
'elc': 'element14 新品更新',
'poc': 'powell 新品采集',
'pog': 'powell sku更新',
"mcc": 'microchip 新品采集',
'mcg': 'microchip sku更新',
"mtg": "master sku更新",
"hlg": "heilind sku更新",
"tic": "TI 新品采集",
"tig": "TI sku更新",
"mxc": 'maxim 新品采集',
'mxg': 'maxim sku更新',
'ikkw': 'ickey 自营采集',
'ikid': 'ickey 指数采集',
'mkg': 'marki sku更新',
'cfc': 'corestaff 新品采集',
'cfg': 'corestaff sku更新',
"lcg": '深圳立创 sku更新',
"lcc": "深圳立创 新品采集",
"icnud": 'ic交易网 更新',
"apc": 'aipco 新品更新',
"apg": 'aipco sku更新',
"fuc": 'future 新品更新',
"fug": 'future sku更新',
"tmeg": 'tme sku更新',
"oyg": '唯样 sku更新',
"wpgg": 'wpg sku更新'
}
task_rate_warning = {'spu': 70, 'pc': 90, 'ikkw': 50, 'ikid': 50, 'icnud': 95}
# 任务(dggw) digikey sku更新 错误率 31.1%
def gen_msg(time_range, buckets):
msg = 'TAS任务监控预警 过去%d分钟内:\n' % time_range
demo = '任务:{} {} 平均错误率为{:.2f}%'
msgs = []
for task in buckets:
key = task['key']
if key in task_rate_warning and task['avgFailRate']['value'] < task_rate_warning[key]:
continue
desc = task_mapping[key] if key in task_mapping else '未知'
msg_line = demo.format(key, desc, task['avgFailRate']['value'])
msgs.append(msg_line)
msg += msg_line + '\n'
if msgs:
return msg
else:
return
def get_task_error_rate(time_range=30, error_rate=30):
utc_since = datetime.utcnow() - timedelta(minutes=time_range)
# date_now = datetime.now() - timedelta(minutes=time_range)
index = 'logstash-' + utc_since.strftime('%Y.%m.%d')
query = {
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": utc_since.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
},
"aggs": {
"taskCodes": {
"terms": {
"field": "taskCode",
"size": 40
},
"aggs": {
"avgFailRate": {
"avg": {
"field": "failRate"
}
},
"avgFailRateFilter": {
"bucket_selector": {
"buckets_path": {
"avgFailRate": "avgFailRate"
},
"script": {
"source": "params.avgFailRate >= " + str(error_rate)
}
}
}
}
}
}
}
response = ichunt_elk.search(index=index, doc_type='doc', body=query)
buckets = response['aggregations']['taskCodes']['buckets']
if buckets:
return gen_msg(time_range, buckets)
return
if __name__ == "__main__":
print(get_task_error_rate(error_rate=25))
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
import os
from logging.handlers import RotatingFileHandler
loggerLevel = logging.INFO
root_path = os.getcwd().split('tas_monitor')[0] + 'tas_monitor'
log_dir = root_path + '/logs'
# log_dir = "logs"
console_formatter = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s'
json_formatter = '%(message)s'
logger_dict = dict()
def create_logger(log_name, log_type):
g_logger = logging.getLogger(log_name)
# rt = log_name.split('_')[0]
log_path = "%s" % log_dir
if not os.path.exists(log_path):
os.makedirs(log_path)
logfile = log_name + ".log"
log_file = "%s/%s" % (log_path, logfile)
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(console_formatter))
handler = RotatingFileHandler(log_file, maxBytes=2 * 1204 * 1024, backupCount=1)
fmt = json_formatter if log_type == 'json' else console_formatter
handler.setFormatter(logging.Formatter(fmt))
g_logger.addHandler(console)
g_logger.addHandler(handler)
g_logger.setLevel(loggerLevel)
return g_logger
def get_logger(log_name, log_type='file'):
if log_name not in logger_dict:
create_logger(log_name, log_type)
logger_dict[log_name] = logging.getLogger(log_name)
return logging.getLogger(log_name)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import time
from urllib.parse import quote_plus
import pymongo
MG_HOST_SET = {
'test': '192.168.1.237',
'produce': '172.18.137.23'
}
ENV = 'test' if sys.platform in ('darwin', 'win32') else 'produce'
def get_mongo_conf():
host = MG_HOST_SET[ENV]
conf = {
"host": host,
"port": 27017,
"database": 'ichunt',
"user": 'ichunt',
"password": 'huntmon66499'
}
return conf
class MongoOperator:
def __init__(self):
config = get_mongo_conf()
uri = 'mongodb://%s:%s@%s/%s' % (
quote_plus(config['user']), quote_plus(config['password']), config['host'], config['database'])
self.conn = pymongo.MongoClient(uri)
self.db = self.conn[config['database']]
# element14 chip1stop ti
self.colls = ('alliedelec', 'arrow', 'avnet', 'buerklin', 'digikey', 'master', 'rs', 'rochester',
'verical', "powell", 'microchip', 'tme', 'heilind', 'maxim', 'aipco', 'company', 'rutronik',
'mouser', 'corestaff', 'wpg', 'szlc', 'element14', 'chip1stop', 'future')
@staticmethod
def get_unexpired_time(plat):
if plat == 'powell':
ts = int(time.time()) - 3600 * 24 * 7 # 5d
elif plat in ('digikey',):
ts = int(time.time()) - 3600 * 72 # 72h
elif plat in ('microchip', 'heilind', 'element14', 'chip1stop', 'future'):
ts = int(time.time()) - 3600 * 24 # 24h
else:
ts = int(time.time()) - 3600 * 48 # 48
return ts
@staticmethod
def get_query(ts, plat):
expired_query, total_query = {'time': {"$lt": ts}}, {}
if plat not in ('element14', 'chip1stop', 'future'):
expired_query['is_error'] = 0
total_query['is_error'] = 0
elif plat in ('element14', 'chip1stop'):
expired_query = {'time': {"$gt": ts}}
return expired_query, total_query
def get_platform_data_expired_rate(self):
msg = '数据过期率监控预警 过去3小时内:\n'
demo = '平台:{} 数据过期率为{:.2f}%'
msgs = []
for plat in self.colls:
coll = self.db[plat]
ts = self.get_unexpired_time(plat)
eq, tq = self.get_query(ts, plat)
expired = coll.count(eq)
if plat == 'element14':
total = 600000
expired = total - expired
elif plat == 'chip1stop':
total = 800000
expired = total - expired
else:
total = coll.count(tq) # if plat != 'element14' else 600000 # element14 不执行总数查询 手动设置为60w
if total > 0:
rate = expired / total * 100
print(plat, expired, total, rate)
if rate >= 10:
msg_line = demo.format(plat, rate)
msgs.append(msg_line)
msg += msg_line + '\n'
else:
print(plat, expired, total)
if msgs:
return msg
else:
return
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from utils.config import get_mysql_conf
class MySqlOperator:
def __init__(self, db_key_name):
config = get_mysql_conf(db_key_name)
self.db = pymysql.connect(**config)
def re_connect(self):
try:
self.db.ping()
except Exception:
self.db.connect()
def check_exist_by_unicode(self, uc):
with self.db.cursor() as cursor:
sql = "SELECT status FROM lie_spider_board WHERE unique_code = %s"
cursor.execute(sql, uc)
result = cursor.fetchone()
return result
def insert_process(self, p):
with self.db.cursor() as cursor:
sql = """
INSERT INTO lie_spider_board(task_code, platform, concurrency, queue, unique_code, status,
task_type, start_time, run_time, remain_task, remain_wrong_task, wrong_radio) VALUES
(%(task_code)s,%(platform)s,%(concurrency)s,%(queue)s,%(unique_code)s,%(status)s,%(task_type)s,
%(start_time)s,%(run_time)s,%(remain_task)s,%(remain_wrong_task)s,%(wrong_radio)s)
"""
cursor.execute(sql, p)
self.db.commit()
def update_process(self, p):
with self.db.cursor() as cursor:
sql = """
UPDATE lie_spider_board SET run_time=%(run_time)s, remain_task=%(remain_task)s,
remain_wrong_task=%(remain_wrong_task)s, wrong_radio=%(wrong_radio)s WHERE unique_code=%(unique_code)s
"""
cursor.execute(sql, p)
self.db.commit()
def get_process_alive(self):
with self.db.cursor() as cursor:
sql = "SELECT task_code, unique_code FROM lie_spider_board WHERE status IN (0, 1)"
cursor.execute(sql)
result = cursor.fetchall()
return result
def update_status(self, s, u):
with self.db.cursor() as cursor:
sql = """
UPDATE lie_spider_board SET status=%s WHERE unique_code=%s
"""
cursor.execute(sql, (s, u))
self.db.commit()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
dd_robot_api = ' https://oapi.dingtalk.com/robot/send?access_token='
tom = 'c19beeab837f5c6e019e7fc602d470704d634ae33510f40fb0e903cde215fb24'
def dd_send_msg(msg, robot=tom):
data = {
"msgtype": "text",
"text": {
"content": msg
}
}
requests.post(dd_robot_api + robot, json=data)
if __name__ == "__main__":
dd_send_msg('喂喂 你发的太多了!')
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import redis
from utils.es_api import task_mapping
env = 'test' if sys.platform in ('darwin', 'win32') else 'produce'
def get_redis_conf():
conf = {
'host': 'localhost',
'port': 6379,
'db': 0,
}
if env.startswith('produce'):
conf['password'] = 'icDb29mLy2s'
return conf
pool = redis.ConnectionPool(**get_redis_conf())
task_redis = redis.Redis(connection_pool=pool)
def get_flag():
flags = task_redis.hgetall('task_running_flag')
if flags:
msg = 'TAS暂停中的调度任务有:\n'
demo = '任务:{} {}'
for k in flags:
k = k.decode()
desc = task_mapping[k] if k in task_mapping else '未知'
line = demo.format(k, desc)
msg += line + '\n'
return msg
if __name__ == "__main__":
get_flag()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment