Commit 2e1d54f9 by lzzzzl

提交修改

parent 5b397aa3
Showing with 1535 additions and 269 deletions
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7.3 (C:\Users\ichunt\AppData\Local\Programs\Python\Python37\python.exe)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="data" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" serverName="lzzzzl">
<serverData>
<paths name="lzzzzl">
<serverdata>
<mappings>
<mapping deploy="/data2/bosslin" local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="useProjectProfile" value="false" />
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7.3 (C:\Users\ichunt\AppData\Local\Programs\Python\Python37\python.exe)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/bosslin.iml" filepath="$PROJECT_DIR$/.idea/bosslin.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SshConfigs">
<configs>
<sshConfig authType="PASSWORD" host="47.106.81.102" id="14ecf9db-e841-4ffb-8dc1-2d78e995a515" port="2311" customName="lzzzzl" nameFormat="CUSTOM" username="liexin521" />
<sshConfig authType="PASSWORD" host="47.106.81.102" id="fcd842f4-a323-4e79-9a96-76767cce7b47" port="2311" customName="lzzzzl" nameFormat="CUSTOM" username="liexin521" />
</configs>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="WebServers">
<option name="servers">
<webServer id="713fdefb-c869-433f-861c-03014a1e4b27" name="lzzzzl" url="http://47.106.81.102">
<fileTransfer host="47.106.81.102" port="2311" accessType="SFTP">
<advancedOptions>
<advancedOptions dataProtectionLevel="Private" />
</advancedOptions>
<option name="port" value="2311" />
</fileTransfer>
</webServer>
</option>
</component>
</project>
\ No newline at end of file
......@@ -74,3 +74,6 @@ if __name__ == '__main__':
# ERP更新CMP_CODE
elif sys.argv[1] == 'erp_code':
PiErp().pipeline_erp_CmpCode()
# EFFECT SKU
elif sys.argv[1] == 'effect_sku':
PiGoods.effect_sku()
from pipeline.pi_rank import PiRank
from config.conn_list import ConnList
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from utils.excel_handler import ExcelHandler
from extract.ex_erp import ExERP
from pipeline.pi_rank import PiRank
from pipeline.pi_erp import PiErp
from pipeline.pi_lx_log import PiLxLog
import math
import requests
import time
import json
from translate.ts_erp import TsERP
from pipeline.pi_weekly import PiWeekly
def test():
rd = []
db = ConnList.CreditSource()
sql = "SELECT erp_company_code,poolfund_no,poolfund_source_no,entru_no,order_amount,business_time,\
deadline_day,receive_time,return_amount,return_time,is_settle,delay_day,delay_amount,\
is_credit,gross_profit,poolfund_type,source_type\
FROM lie_basic_detail"
result = DBHandler.read(db, sql)
for row in result:
rd.append({
'company': row[0],
'poolfund_no': row[1],
'poolfund_source_no': row[2],
'entru_no': row[3],
'order_amount': row[4],
'business_time': DateHandler.unix_to_date(row[5], fmt="%Y-%m-%d"),
'deadline_day': row[6],
'receive_time': DateHandler.unix_to_date(row[7], fmt="%Y-%m-%d"),
'return_amount': row[8],
'return_time': DateHandler.unix_to_date(row[9], fmt="%Y-%m-%d"),
'is_settle': '是' if row[10] == 1 else '否',
'delay_day': row[11],
'delay_amount': row[12],
'is_credit': '有' if row[13] == 1 else '无',
'gross_profit': row[14],
'poolfund_type': row[15],
'source_type': '供应链' if row[16] == 1 else '科技'
})
# Excel标题
title = ['公司', '资金池编号', '资金池来源编码', '入仓号', '下单金额', '业务时间', '订单期限(账期天数)', '应收日期(业务日期+期限)', '回款金额', '回款日期', '是否结清',
'逾期支付天数', '逾期支付金额', '信用', '毛利', '资金池类型', '来源']
# Excel内容
content = ['company', 'poolfund_no', 'poolfund_source_no', 'entru_no', 'order_amount', 'business_time',
'deadline_day', 'receive_time',
'return_amount', 'return_time',
'is_settle', 'delay_day', 'delay_amount',
'is_credit', 'gross_profit', 'poolfund_type', 'source_type']
ExcelHandler.write_to_excel(title, content, rd, "result", result_type=2)
if __name__ == '__main__':
test()
from utils.db_handler import DBHandler
from config.conn_list import ConnList
import random
data = ExcelHandler.read_to_excel("原厂对接秘钥.xlsx", "Sheet1", 1)
db = ConnList.Dashboard()
def insert(canal, platform_name):
platform_type = 1
land_send = random.randint(5, 7)
hk_send = random.randint(3, 4)
usd_ratio = random.choice(["1.000", "1.050", "1.100", "1.150"])
rmb_radio = round(random.uniform(8.3335, 8.712), 3)
fre = random.randint(8, 24)
start_time = random.randint(1577808000, 1603866647)
end_time = random.randint(start_time, 1603866647)
sql = "INSERT INTO lie_data_manage (platform_name,platform_cn_name,platform_type, \
status,on_off,frequency,\
start_time,end_time,manual_push,\
canal,rmb_ratio,usd_ratio,\
is_type,`key`,`condition`,\
hk_delivery_time,cn_delivery_time) VALUES \
('%s','%s','%s',\
'%s','%s','%s',\
'%s','%s','%s',\
'%s','%s','%s',\
'%s','%s','%s',\
'%s','%s')" % (
platform_name, platform_name, platform_type,
"1", "2", fre,
start_time, end_time, "2",
canal, rmb_radio, usd_ratio,
"1","","",
hk_send, land_send)
# print(sql)
DBHandler.insert(db, sql)
# print(platform_name, canal)
for row in data:
canal = row[0]
platform = row[1]
insert(canal, platform)
print(len(data))
\ No newline at end of file
No preview for this file type
No preview for this file type
......@@ -4,7 +4,9 @@
dashboard_server = {
'host': 'localhost',
'user': 'dashboard',
# 'user': 'root',
'password': 'ichunt5Dashboard@',
# 'password': '123',
'db_name': 'dashboard'
}
......@@ -27,6 +29,10 @@ bigdata_server = {
'user': 'bigdata',
'password': 'bdYm2yy2mmyzlmlly',
'db_name': 'bigdata'
# 'host': '192.168.1.234',
# 'user': 'ichunt',
# 'password': 'ichunt',
# 'db_name': 'bigdata'
}
wms_server = {
......@@ -43,6 +49,7 @@ lxdata_server = {
'db_name': 'liexin_data'
}
# 自营数据库
zy_server = {
'host': '172.18.137.33',
'user': 'LxDDUsedRead',
......@@ -50,6 +57,7 @@ zy_server = {
'db_name': 'liexin_data'
}
# 行为数据库
behavior_server = {
'host': '172.18.137.21',
'user': 'lxbehaviorread',
......
......@@ -11,6 +11,7 @@ supplier = {
'alliedelec': 12,
'avnet': 13,
'mouser': 14,
'猎芯联营': 17,
'peigenesis': 19,
'powell': 20,
'rs': 21,
......@@ -21,10 +22,7 @@ supplier = {
'microchip': 1677,
'heilind': 1678,
'ti': 1679,
'撮合': -1000,
'猎芯联营-技术采集': 17,
'猎芯联营-渠道开发': 17,
'猎芯自营': 10000,
'wpg': 1680,
}
# 猎芯联营采集
......
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
# _*_ coding:utf-8 _*_
import requests
import json
from extract.ex_base import Base
from utils.date_handler import DateHandler
url = "https://api.baidu.com/json/tongji/v1/ReportService/getData"
header = {
"username": "ichunt2020",
"password": "Ichunt2020123",
"token": "d49070772f60c685cad5b32aff698de4",
"account_type": 1
}
class ExBaidu(Base):
def ex_puv(self):
body = {
"header": header,
"body": {
"site_id": 9832384,
"start_date": DateHandler.today_between_day(2),
"end_date": DateHandler.today_between_day(1),
"metrics": "pv_count,visitor_count,ip_count",
"method": "overview/getTimeTrendRpt"
}
}
PV = 0
UV = 0
try:
res = requests.post(url, json.dumps(body)).json()
data = res['body']['data'][0]['result']['items'][1][1]
PV = data[0]
UV = data[1]
except:
pass
return PV,UV
......@@ -10,7 +10,7 @@ class ExERP(Base):
def __init__(self, date):
self.api_url = 'http://119.23.228.186:6868/ormrpc/services'
self.cmp_url = 'http://119.23.228.186:8081'
self.cmp_url = 'http://119.23.228.186:50005'
self.date = date
self.credit = ConnList.Credit()
self.credit_source = ConnList.CreditSource()
......
......@@ -11,7 +11,7 @@ class ExMarket(Base):
# 筛选字段
col = ['user_id']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Order()
con_str = super().condition_to_str(condition['condition'])
......@@ -25,6 +25,7 @@ class ExMarket(Base):
"""
优惠券订单信息
NO UNDERSTAND
"""
def ex_user_coupon(self, condition):
......@@ -42,7 +43,15 @@ class ExMarket(Base):
WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
# 哈皮 TIME 1HOUR 每天
final_result = super().result_to_dict(col, results)
return final_result
"""
GOLANG
"""
\ No newline at end of file
......@@ -60,6 +60,7 @@ class ExPurchase(Base):
"""
采购单明细
前端
"""
def purchase_items(self, condition):
......
......@@ -13,6 +13,7 @@ from load.load_mysql import LoadMysql
from utils.excel_handler import ExcelHandler
from config.db import rabbitMq_server
from utils.rabbitMq_handler import Customer
import traceback
class ExSkuExpose(Base):
......@@ -166,7 +167,7 @@ class ExSkuExpose(Base):
lowest_supplier = ExSkuExpose.merge_lowest_price(goods_name, ladder_price[0], supplier_type,
lowest_supplier)
except:
pass
traceback.print_exc()
# 计算最低价格
lowest_result = ExSkuExpose.cal_lowest_price(lowest_supplier)
......@@ -180,17 +181,17 @@ class ExSkuExpose(Base):
data = ex_order.order_items(where)
# 遍历订单数据
for row in data:
supplier_name = row['supplier_name']
canal = row['canal']
# 猎芯联营渠道特殊处理
if supplier_name == '猎芯联营':
supplier_name = '猎芯联营-渠道开发' if canal in sku_cal['猎芯联营-渠道开发']['canal'] else '猎芯联营-技术采集'
# sku_cal更新
if supplier_name in sku_cal:
sku_cal[supplier_name]['order_sku'] += 1
if row['status'] > 2:
sku_cal[supplier_name]['order_paid_sku'] += 1
# for row in data:
# supplier_name = row['supplier_name']
# canal = row['canal']
# # 猎芯联营渠道特殊处理
# if supplier_name == '猎芯联营':
# supplier_name = '猎芯联营-渠道开发' if canal in sku_cal['猎芯联营-渠道开发']['canal'] else '猎芯联营-技术采集'
# # sku_cal更新
# if supplier_name in sku_cal:
# sku_cal[supplier_name]['order_sku'] += 1
# if row['status'] > 2:
# sku_cal[supplier_name]['order_paid_sku'] += 1
# SKU数值计算
for supplier_name in sku_cal:
......@@ -276,6 +277,7 @@ class ExSkuExpose(Base):
if supplier_name in lowest_result:
sku_cal[supplier_name]['lowest_radio'] = str(round(int(lowest_result[supplier_name]) / len(sku_cal[supplier_name]['goods_name']) * 100, 2)) + '%' if len(sku_cal[supplier_name]['goods_name']) > 0 else '0%'
# print(len(sku_cal))
# 更改数据为邮件格式
index = 1
e_data = {'day': DateHandler.now_date(1, 1)}
......
No preview for this file type
No preview for this file type
from utils.excel_handler import ExcelHandler
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from config.conn_list import ConnList
import datetime
data = ExcelHandler.read_to_excel('待分配客户-重新分配.xls', 'Sheet1', 1)
data.pop(0)
db = ConnList.Order()
ic_db = ConnList.IcData()
ds_db = ConnList.WrCrm()
now_ts = DateHandler.now_datetime()
com_types = {'默认': 0, '代工厂': 1, '终端': 2, '代理商': 3, '贸易商': 4, '其他': 5}
cus_types = {'默认': 0, '老板': 1, '采购': 2, '工程师': 3, '学生': 4, '其他': 5}
print(len(data))
for row in data:
email = str(row[0]).strip()
mobile = str(row[1]).strip()
company = str(row[2]).strip()
sale_man = str(row[3]).strip()
if mobile != '':
sql = "SELECT user_id FROM lie_user_main WHERE mobile = \'%s\'" % mobile
else:
sql = "SELECT user_id FROM lie_user_main WHERE email = \'%s\'" % email
user = DBHandler.read(db, sql)
if len(user) > 0:
outter_uid = user[0][0]
sql = "select userId from user_info where name = \'%s\'" % sale_man
sale = DBHandler.read(db=ic_db, sql=sql)
if len(sale) > 0:
sale_id = sale[0][0]
sql = "SELECT user_id FROM lie_user WHERE outter_uid = %d AND source IN (1,2)" % outter_uid
is_exist = DBHandler.read(ds_db, sql)
if len(is_exist) > 0:
user_id = is_exist[0][0]
sql = "UPDATE lie_salesman SET user_id = %d,sale_id = %d WHERE user_id = %d" % (user_id, sale_id, outter_uid)
DBHandler.update(ds_db, sql)
# print(len(data))
from utils.db_handler import DBHandler
from config.conn_list import ConnList
from utils.excel_handler import ExcelHandler
import pymysql
# # sql = "SELECT goods_name,brand_name,param FROM lie_bom_base_data"
# sql = "SELECT brand_name FROM lie_bom_brand_name"
# db = ConnList.Dashboard()
#
#
# result = DBHandler.read(db, sql)
# gl = []
# bl = []
# pl = []
# rd = []
#
# for row in result:
# brand_name = row[0]
# bl.append(brand_name)
#
# # for row in result:
# # goods_name = row[0]
# # brand_name = row[1]
# # param = row[2]
# #
# # gl.append(goods_name)
# # if brand_name not in bl:
# # bl.append(brand_name)
# # pl.append(param)
#
# # for goods_name in gl:
# # rd.append({
# # 'target': goods_name,
# # 'value': 'goods_name'
# # })
#
# for brand_name in bl:
# rd.append({
# 'target': brand_name,
# 'value': 'brand_name'
# })
#
# # for param in pl:
# # rd.append({
# # 'target': param,
# # 'value': 'param'
# # })
#
#
# title = ['target', 'value']
# content = ['target', 'value']
#
# ExcelHandler.write_to_excel(title, content, rd, "result2", result_type=2)
sql = "SELECT MAX(id) FROM lie_spu"
conn = pymysql.connect("192.168.1.235", "learning_data", "learning_data#zsyM", "learning_data", charset='utf8')
data = DBHandler.read(conn, sql)
max_id = data[0][0]
start = 1
end = 1000
rd = []
while True:
sql = "SELECT id,spu_name FROM lie_spu WHERE id BETWEEN %d AND %d" % (start, end)
result = DBHandler.read(conn, sql)
for row in result:
spu_id = row[0]
spu_name = row[1]
if spu_name.isdigit() and len(spu_name) <= 4:
print('digit: ' + spu_name + ' id: ' + str(spu_id))
rd.append({'target': spu_name, 'value': spu_id})
start = end + 1
end = end + 1000
if end % 10000 == 0:
print(end)
if end > max_id:
break
title = ['target', 'value']
content = ['target', 'value']
ExcelHandler.write_to_excel(title, content, rd, "result2", result_type=2)
# print(max_id)
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.helpers import bulk
from utils.db_handler import DBHandler
import pymysql
from utils.excel_handler import ExcelHandler
from utils.date_handler import DateHandler
import threading
from decimal import Decimal
from decimal import getcontext
import re
col_dict = {
'阻值(欧姆)': 'resistance',
'容值': 'capacity',
'电感值': 'inductance',
'额定电压': 'rated_voltage',
'额定电流': 'rated_current',
'功率': 'power',
'直流电阻(内阻)': 'dc_resistance',
'精度': 'accuracy',
'封装': 'encap',
'温漂系数(介质材料)': 'temperature'
}
col_list = ['阻值(欧姆)','容值','电感值','额定电压','额定电流','功率','直流电阻(内阻)','精度','封装','温漂系数(介质材料)']
res_map = {
'μΩ': 0.000001,
'mΩ': 0.001,
'Ω': 1,
'kΩ': 1000,
'MΩ': 1000000,
'GΩ': 1000000000,
'TΩ': 1000000000000
}
cap_map = {
'pF': 1,
'nF': 1000,
'µF': 1000000,
'mF': 1000000000,
'F': 1000000000000
}
ind_map = {
'pH': 1,
'nH': 1000,
'µH': 1000000,
'mH': 1000000000,
'H' : 1000000000000,
}
cur_map = {
'mA': 1,
'A': 1000
}
vol_map = {
'V': 1,
'kV': 1000
}
pow_map = {
'W': 1,
'kW': 1000
}
acc_map = {
'%': 0.01
}
encap_list = ["0030", "008004", "01005", "015008","0201","02016","0202","0204","03015","03019","0302","0303","0306","0402","0404","0405","0406","0502","0503","0504","0505","0508",
"0602","0603","0604","0605","0606","0612","0704","0707","0709","0803","0804","0805","0806","0815","0830","0905","0909","1003","1005","1006","1007","1008","1010","1012",
"1020","1106","1111","1113","1204","1205","1206","1207","1208","1209","1210","1211","1212","1213","1218","1225","1230","1246","1305","1410","1411","1412","1414","1505","1507",
"1510","1512","1515","1530","1606","1608","1611","1612","1616","1625","1708","1715","1805","1806","1807","1808","1810","1812","1816","1825","1835","1913","1916","1919","2005",
"2008","2010","2015","2016","2018","2020","2023","2043","2211","2214","2215","2218","2220","2225","2304","2312","2323","2325","2410","2414","2416","2420","2423","2424","2512",
"2520","2525","2615","2616","2706","2709","2711","2721","2727","2728","2812","2820","2824","2825","2828","2830","2910","2913","2915","2917","2920","2924","3010","3015","3017",
"3022","3024","3025","3040","3131","3225","3226","3312","3318","3333","3640","3838","3920","3925","3931","4030","4032","4122","4520","4823","4850","5040","5329","5829","5929",
"6028","6030","6031","6039","6054","6560"]
cast_map = {
'毫欧': 'mΩ',
'欧姆': 'Ω',
'千欧': 'KΩ',
'兆欧': 'MΩ',
'最大': ''
}
brand_map = {
'Yageo': 1,
'Murata': 2,
'Samsung': 3,
'TDK': 4,
'Taiyo Yuden': 5,
'Rohm': 6,
'KEMET': 7,
'Vishay': 8,
'AVX': 9,
'Panasonic': 10,
'KOA': 11,
'Bourns': 12,
'TE': 13,
'Wurth': 14,
'Pulse': 15
}
temp_map = ["C0G","NP0","COG","NPO","X7R","X5R","Y5V","X6S","X7S",
"X7T","SL","U2J","UJ","X7U","X8R","Z5U","C0H","COH",
"U2K","X6T","X8G","X8L","Y5R","Y5U","ZLM"]
sep_temp_map = ["C0G","NP0","COG","NPO"]
host_list = [
# {"host": "192.168.2.232", "port": 9200},
{"host": "172.18.137.30", "port": 9211},
]
client = Elasticsearch(host_list)
index = "goods_map"
type = "goods"
def load_excel():
data = ExcelHandler.read_to_excel('E:\\doc\\DGK参数.xlsx', 'Sheet1', 1)
# data = ExcelHandler.read_to_excel('/home/liexin521/GDB参数0630.xlsx', 'Sheet1', 1)
data.pop(0)
attr_dict = {}
for row in data:
attr_id = int(row[0])
field = row[3]
attr_dict[attr_id] = field
return attr_dict
def read_db(attr_dict):
conn = pymysql.connect("192.168.2.232", "bigdata", "bigdata2019", "digikey", charset='utf8')
# conn = pymysql.connect("172.18.137.37", "bigdata", "bdYm2yy2mmyzlmlly", "digikey", charset='utf8')
sql = "SELECT g.goods_id,g.goods_name,b.brand_name FROM lie_goods g LEFT JOIN lie_brand b ON g.brand_id = b.brand_id WHERE g.is_delete = 0"
goods_rst = DBHandler.read(conn, sql)
index = 0
field_dict = {
0: [],
1: [],
2: [],
3: [],
4: [],
5: [],
6: [],
7: [],
8: [],
9: [],
}
for row in goods_rst:
goods_id = str(row[0])
goods_name = row[1]
brand_name = str(row[2]).upper()
field = goods_id[-1:]
field_dict[int(field)].append({'goods_id': goods_id, 'goods_name': goods_name, 'brand_name': brand_name})
index += 1
if index % 10000 == 0:
print(index)
return field_dict
def upload_es():
# conn = pymysql.connect("192.168.1.235", "learning_data", "learning_data#zsyM", "learning_data", charset='utf8')
conn = pymysql.connect("localhost", "lEaUx", "Xp#psOlT@!@#lfs^%wl", "learning_data", charset='utf8')
getcontext().prec = 15
def load_db():
sql = "SELECT resistance,capacity,inductance,rated_voltage,rated_current,power,dc_resistance,accuracy,encap,temperature,goods_name,brand_name,brand_sort,id,update_time FROM lie_goods_attrs"
return DBHandler.read(conn, sql)
def remove_exponent(num):
return str(num.to_integral()) if num == num.to_integral() else str(num.normalize())
# 阻值(欧姆)
def trans_resistance(key):
if key == '-' or key == '':
return key, ''
key = str(key).replace("Ohms", 'Ω')
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]Ω|µΩ|Ω)$', key.replace(" ",""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
if res_map.get(unit):
ts_value = remove_exponent(Decimal(Decimal(value) * Decimal(res_map[unit])))
else:
ts_value = ''
return key, ts_value
# 容值
def trans_capacity(key):
if key == '-' or key == '':
return key, ''
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]F|µF|F)$', key.replace(" ", ""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal(cap_map[unit]))) if cap_map.get(unit) else ''
# 电感值
def trans_inductance(key):
if key == '-' or key == '':
return key, ''
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]H|µH|H)$', key.replace(" ", ""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal(ind_map[unit]))) if ind_map.get(unit) else ''
# 额定电压
def trans_rated_voltage(key):
if key == '-' or key == '':
return key, ''
key = str(key).split("(")[0]
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]V|V)$', key.replace(" ", ""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal(vol_map[unit]))) if vol_map.get(unit) else ''
# 额定电流
def trans_rated_current(key):
if key == '-' or key == '':
return key, ''
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]A|A)$', key.replace(" ", ""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal(cur_map[unit]))) if cur_map.get(unit) else ''
# 功率
def trans_power(key):
if key == '-' or key == '':
return key, ''
key = str(key).split(",")[0]
obj = re.match(r'(\d+|\d+\.\d+)([a-zA-Z]W|W)$', key.replace(" ", ""))
value = obj.group(1) if obj else '0'
unit = obj.group(2) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal(pow_map[unit]))) if pow_map.get(unit) else ''
# 直流电阻
def trans_dc_resistance(key):
if key == '-' or key == '':
return key, ''
for ct in cast_map:
if str(key).find(ct) != -1:
key = str(key).replace(ct, cast_map[ct])
return trans_resistance(key)
# 精度
def trans_accuracy(key):
if key == '-' or key == '':
return key, ''
obj = re.match(r'(±|)(\d+|\d+\.\d+)([a-zA-Z]+|%)$', key.replace(" ", ""))
value = obj.group(2) if obj else '0'
unit = obj.group(3) if obj else 1
return key, remove_exponent(Decimal(Decimal(value) * Decimal("0.01")))
# 封装
def trans_encap(key):
if key == '-' or key == '':
return key, ''
key = str(key).split(",")[0]
for encap in encap_list:
if str(key).find(encap) != -1:
return key, encap
return key, ''
# 温度
def trans_temp(key):
if key == '-' or key == '':
return key, ''
for sep_temp in sep_temp_map:
if str(key).find(sep_temp) != -1:
return key, "C0G"
for temp in temp_map:
if str(key).find(temp) != -1:
return key, temp
return key, ''
ACTIONS = []
data = load_db()
for row in data:
resistance = row[0]
capacity = row[1]
inductance = row[2]
rated_voltage = row[3]
rated_current = row[4]
power = row[5]
dc_resistance = row[6]
acc_accuracy = row[7]
encap = row[8]
temperature = row[9]
goods_name = row[10]
brand_name = row[11]
brand_sort = row[12]
auto_id = row[13]
update_time = row[14]
# 电阻(欧姆)
re_key, re_value = trans_resistance(resistance)
# 容值
cap_key, cap_value = trans_capacity(capacity)
# 电感
ind_key, ind_value = trans_inductance(inductance)
# 额定电压
vol_key, vol_value = trans_rated_voltage(rated_voltage)
# 额定电流
cur_key, cur_value = trans_rated_current(rated_current)
# 功率
pow_key, pow_value = trans_power(power)
# 直流电阻
dc_key, dc_value = trans_dc_resistance(dc_resistance)
# 精度
acc_key, acc_value = trans_accuracy(acc_accuracy)
# 封装
encap_key, encap_value = trans_encap(encap)
# 温度
temp_key, temp_value = trans_temp(temperature)
# print(temp_key, temp_value)
# 生成全文检索字符串
key_list = [re_key,cap_key,ind_key,vol_key,cur_key,pow_key,dc_key,acc_key,encap_key, temp_key]
key_str = " ".join(filter(lambda x:x != '',key_list))
# 生成ES上传数组
attrs = []
value_list = [re_value,cap_value,ind_value,vol_value,cur_value,pow_value,dc_value,acc_value,encap_value,temp_value]
for key,value in zip(col_list,value_list):
if value != '':
attrs.append({'attr_value': '%s€%s' % (key,value)})
ACTIONS.append(
{
"_index": index,
"_type": type,
"_id": auto_id,
"_source": {
"auto_id": auto_id,
"brand_name": brand_name,
"brand_sort": brand_sort,
"goods_name": goods_name,
"attr_str": key_str,
"attrs": attrs,
"update_time": update_time
}
}
)
if len(ACTIONS) % 500 == 0:
load_es(ACTIONS)
ACTIONS.clear()
if len(ACTIONS) > 0:
load_es(ACTIONS)
def load_es(ACTIONS):
count = 0
while count < 3:
count += 1
try:
success, _ = bulk(client, ACTIONS, raise_on_error=True)
print('Performed %d actions' % success)
return
except:
print("wrong")
def write_db(data, field):
index = 0
conn1 = pymysql.connect("192.168.2.232", "bigdata", "bigdata2019", "digikey", charset='utf8')
# conn1 = pymysql.connect("172.18.137.37", "bigdata", "bdYm2yy2mmyzlmlly", "digikey", charset='utf8')
conn2 = pymysql.connect("192.168.1.235", "learning_data", "learning_data#zsyM", "learning_data", charset='utf8')
# conn2 = pymysql.connect("localhost", "dashboard", "ichunt5Dashboard@", "dashboard", charset='utf8')
for row in data:
goods_id = row['goods_id']
goods_name = row['goods_name']
brand_name = row['brand_name']
brand_sort = 999
for brand in brand_map:
if brand.upper() in brand_name:
brand_sort = brand_map[brand]
break
index += 1
sql = "SELECT attr_name,attr_value,attr_id FROM lie_goods_attr_fields%d WHERE goods_id = %d" % (int(field), int(goods_id))
col_lt = []
col_value_lt = []
attr_rst = DBHandler.read(conn1, sql)
for ast in attr_rst:
attr_name = ast[0]
attr_value = ast[1]
attr_id = ast[2]
if attr_dict.get(attr_id) and col_dict[attr_dict.get(attr_id)] not in col_lt:
col_lt.append(col_dict[attr_dict.get(attr_id)])
col_value_lt.append("'" + str(attr_value) + "'")
if len(col_lt) > 0:
col = ",".join(col_lt)
col_value = ",".join(col_value_lt)
sql = "INSERT INTO lie_goods_attrs (goods_id,goods_name,brand_name,brand_sort,%s,update_time) VALUES ('%s','%s','%s',%d,%s,%d)" % (col, goods_id, goods_name, brand_name, brand_sort, col_value, DateHandler.now_datetime())
DBHandler.insert(conn2, sql)
if index % 10000 == 0:
print(str(field) + " : " + str(index))
if __name__ == '__main__':
# upload_es()
# test()
attr_dict = load_excel()
field_dict = read_db(attr_dict)
thread_list = []
t0 = threading.Thread(target=write_db, args=(field_dict[0], 0,))
t1 = threading.Thread(target=write_db, args=(field_dict[1], 1,))
t2 = threading.Thread(target=write_db, args=(field_dict[2], 2,))
t3 = threading.Thread(target=write_db, args=(field_dict[3], 3,))
t4 = threading.Thread(target=write_db, args=(field_dict[4], 4,))
t5 = threading.Thread(target=write_db, args=(field_dict[5], 5,))
t6 = threading.Thread(target=write_db, args=(field_dict[6], 6,))
t7 = threading.Thread(target=write_db, args=(field_dict[7], 7,))
t8 = threading.Thread(target=write_db, args=(field_dict[8], 8,))
t9 = threading.Thread(target=write_db, args=(field_dict[9], 9,))
thread_list.append(t0)
thread_list.append(t1)
thread_list.append(t2)
thread_list.append(t3)
thread_list.append(t4)
thread_list.append(t5)
thread_list.append(t6)
thread_list.append(t7)
thread_list.append(t8)
thread_list.append(t9)
for t in thread_list:
t.setDaemon(True) # 设置为守护线程,不会因主线程结束而中断
t.start()
for t in thread_list:
t.join() # 子线程全部加入,主线程等所有子线程运行完毕
print('Mainthread %s ended.' % threading.current_thread().name)
......@@ -6,9 +6,11 @@ from pipeline.pi_activity import PiActivity
from pipeline.pi_daily import PiDaily
from pipeline.pi_weekly import PiWeekly
from pipeline.pi_lx_log import PiLxLog
from pipeline.pi_api_log import PiApiLog
from pipeline.pi_lx_log_url import PiLxLogUrl
from pipeline.pi_rank import PiRank
from utils.date_handler import DateHandler
from pipeline.pi_lc import PiLc
import sys
......@@ -33,6 +35,8 @@ if __name__ == '__main__':
PiWeekly.pipeline_week_hot_goods()
if sys.argv[1] == 'week_classify_hot_goods': # 每周热卖一二级物料
PiWeekly.pipeline_week_classify_hot_goods()
if sys.argv[1] == 'no_result_keyword_weekly': # 每周热卖一二级物料
PiWeekly().pipeline_no_result_keyword_weekly()
if sys.argv[1] == 'lx_brand': # 每周品牌
PiWeekly.pipeline_lx_brand()
if sys.argv[1] == 'lx_order': # 每周订单
......@@ -42,6 +46,10 @@ if __name__ == '__main__':
if sys.argv[1] == 'lx_log': # 日志解析
pi_lx_log = PiLxLog()
pi_lx_log.pipeline_lx_log()
if sys.argv[1] == 'api_log':
pi_api_log = PiApiLog()
pi_api_log.merge_file()
pi_api_log.wash_data()
if sys.argv[1] == 'lx_log_url':
pi_lx_log_url = PiLxLogUrl()
pi_lx_log_url.pipeline_lx_log_url()
......@@ -54,4 +62,14 @@ if __name__ == '__main__':
PiRank().rank_one(start_time, end_time)
if sys.argv[1] == 'lx_rank3':
PiRank().rank_three()
if sys.argv[1] == 'keyword_match':
PiWeekly.pipeline_keyword_match()
if sys.argv[1] == 'no_result_weekly': #每周无结果词
PiWeekly().pipeline_no_result_keyword_weekly()
if sys.argv[1] == 'lc_goods': # 立创商品详情
PiLc().pipeline_lc_order()
if sys.argv[1] == 'zy_summary_daily':
PiGoods.pipeline_zy_daily_summary()
["0030",
"008004",
"01005",
"015008",
"0201",
"02016",
"0202",
"0204",
"03015",
"03019",
"0302",
"0303",
"0306",
"0402",
"0404",
"0405",
"0406",
"0502",
"0503",
"0504",
"0505",
"0508",
"0602",
"0603",
"0604",
"0605",
"0606",
"0612",
"0704",
"0707",
"0709",
"0803",
"0804",
"0805",
"0806",
"0815",
"0830",
"0905",
"0909",
"1003",
"1005",
"1006",
"1007",
"1008",
"1010",
"1012",
"1020",
"1106",
"1111",
"1113",
"1204",
"1205",
"1206",
"1207",
"1208",
"1209",
"1210",
"1211",
"1212",
"1213",
"1218",
"1225",
"1230",
"1246",
"1305",
"1410",
"1411",
"1412",
"1414",
"1505",
"1507",
"1510",
"1512",
"1515",
"1530",
"1606",
"1608",
"1611",
"1612",
"1616",
"1625",
"1708",
"1715",
"1805",
"1806",
"1807",
"1808",
"1810",
"1812",
"1816",
"1825",
"1835",
"1913",
"1916",
"1919",
"2005",
"2008",
"2010",
"2015",
"2016",
"2018",
"2020",
"2023",
"2043",
"2211",
"2214",
"2215",
"2218",
"2220",
"2225",
"2304",
"2312",
"2323",
"2325",
"2410",
"2414",
"2416",
"2420",
"2423",
"2424",
"2512",
"2520",
"2525",
"2615",
"2616",
"2706",
"2709",
"2711",
"2721",
"2727",
"2728",
"2812",
"2820",
"2824",
"2825",
"2828",
"2830",
"2910",
"2913",
"2915",
"2917",
"2920",
"2924",
"3010",
"3015",
"3017",
"3022",
"3024",
"3025",
"3040",
"3131",
"3225",
"3226",
"3312",
"3318",
"3333",
"3640",
"3838",
"3920",
"3925",
"3931",
"4030",
"4032",
"4122",
"4520",
"4823",
"4850",
"5040",
"5329",
"5829",
"5929",
"6028",
"6030",
"6031",
"6039",
"6054",
"6560"]
import requests
import time
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
from config.conn_list import ConnList
class Base:
def __init__(self, name):
self.name = name
self.result = {}
self.requests = requests
self.time = time
self.DateHandler = DateHandler
self.DBHandler = DBHandler
self.ConnList = ConnList
print(name)
No preview for this file type
# _*_ coding:utf-8 _*_
import subprocess
import time
import re
import traceback
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
from config.conn_list import ConnList
class PiApiLog:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = ConnList.Dashboard()
self.day = DateHandler.now_date(1, 1)
self.before_day = DateHandler.now_date(2, 1)
def merge_file(self):
# HDFS数据写入本地
day = self.day
bf_day = self.before_day
hdfs_file = '/api_log/api_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
des_file = ' /data3/hdfs_data/api_log'
cmd = 'hadoop dfs -get ' + hdfs_file + des_file
subprocess.getoutput(cmd)
# 合并小文件
cmd = 'find' + des_file + ' -type f -exec cat {} \; >' + des_file + '/sum'
subprocess.getoutput(cmd)
# 文件回传HDFS
cmd = 'hadoop dfs -put /data3/hdfs_data/api_log/sum ' + '/api_log/api_log_' + day.split('-')[0][2:4] + \
day.split('-')[1] + day.split('-')[2]
subprocess.getoutput(cmd)
# 删除本地文件
cmd = 'rm -f /data3/hdfs_data/api_log/*'
subprocess.getoutput(cmd)
# 删除HDFS小文件
hdfs_file = '/api_log/api_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
cmd = 'hadoop dfs -rm -r ' + hdfs_file
subprocess.getoutput(cmd)
# 删除历史HDFS SUM文件
hdfs_file = '/api_log/api_log_' + bf_day.split('-')[0][2:4] + bf_day.split('-')[1] + bf_day.split('-')[2] + '/sum'
cmd = 'hadoop dfs -rm -r ' + hdfs_file
subprocess.getoutput(cmd)
def get_api_data(self):
# 读取HDFS数据
day = self.day
hdfs_file = '/api_log/api_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + "/sum"
hdfs_data = DBHandler.hdfs_read(hdfs_file)
return hdfs_data
#
def wash_data(self):
data = self.get_api_data()
url_dt = {}
for row in data:
try:
row = row.decode("utf-8")
arr = row.split(" ")
ip = arr[0]
time_base = arr[3]
url = str(arr[6])
if url.find("?"):
url = str(url).split("?")[0]
if url_dt.get(url):
url_dt[url] += 1
else:
url_dt[url] = 1
except:
traceback.print_exc()
for url in url_dt:
count = url_dt[url]
self.insert_api_log(url, count)
def insert_api_log(self, url, count):
try:
sql = "INSERT INTO lie_api_log (url,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (url, count, self.day)
DBHandler.insert(self.db, sql)
except:
traceback.print_exc()
......@@ -14,6 +14,7 @@ from config.conn_list import ConnList
from pipeline.pi_email import PiEmail
from translate.ts_daily import TsDaily
from translate.ts_crm import TsCrm
from extract.ex_baidu import ExBaidu
from utils.msg_handler import MsgHandler
import traceback
import time
......@@ -188,21 +189,24 @@ class PiDaily:
'cal_ts': DateHandler.now_date(1, 1)}],
db=ConnList.Dashboard(), cal_time=False)
# 神策数据
# 神策数据 -> 百度数据
try:
pv = ExShenCe('').ex_pv()
uv = ExShenCe('').ex_uv()
sum_pv = ExShenCe('').sum_pv()
sum_uv = ExShenCe('').sum_uv()
print(pv, uv, sum_pv, sum_uv)
sc_data = {'pv': sum_pv[0], 'uv': sum_uv[0], 'pc_pv': pv['PC'][0], 'pc_uv': uv['PC'][0],
'h5_pv': pv['H5'][0], 'h5_uv': uv['H5'][0], 'cal_ts': DateHandler.now_date(1, 1)}
pv,uv = ExBaidu('').ex_puv()
print(pv, uv)
sc_data = {'pv': pv, 'uv': uv, 'pc_pv': 0, 'pc_uv': 0, 'h5_pv': 0, 'h5_uv': 0, 'cal_ts': DateHandler.now_date(1, 1)}
col = ['pv', 'uv', 'pc_pv', 'pc_uv', 'h5_pv', 'h5_uv', 'cal_ts']
LoadMysql.simple_dict_load(col=col, table='lie_shence_cal', data=[sc_data],
db=ConnList.Dashboard(), cal_time=False)
sc_data['pc_pv'] = '-'
sc_data['pc_uv'] = '-'
sc_data['h5_pv'] = '-'
sc_data['h5_uv'] = '-'
except:
pass
# TODO 连接测试
# 发送邮件
keyword = 'report-daily'
send_data = {'yes_ts': DateHandler.now_date(0, 1)}
......@@ -570,13 +574,14 @@ class PiDaily:
db=ConnList.Dashboard(),
cal_time=False)
#
PiEmail.pipeline_operate_weekly()
@staticmethod
def pipeline_crm_user():
start_time = 0
end_time = DateHandler.date_time(0)
end_time = DateHandler.now_datetime()
db = ConnList.Order()
wr_db = ConnList.WrCrm()
......
......@@ -307,6 +307,8 @@ class PiErp:
MsgHandler.send_dd_msg('【风控报警】:%s' % traceback.format_exc())
# 汇总计算
month_weight = {}
for cmp_code in cmp_dt:
try:
fmt_dt = cmp_dt[cmp_code]
......@@ -314,6 +316,16 @@ class PiErp:
for fmt in fmt_dt:
cmp_detail = fmt_dt[fmt]
if fmt not in month_weight:
month_weight[fmt] = {
'total_amount' : 0,
'weight_A' : 0,
'weight_B' : 0,
'period_weight_A' : 0,
'period_weight_B' : 0,
'period_platform_level': 0
}
return_amount = cmp_detail['return_amount']
unload_amount = cmp_detail['unload_amount']
receive_amount = cmp_detail['receive_amount']
......@@ -334,9 +346,6 @@ class PiErp:
appoint_tolerance = cmp_detail['appoint_tolerance']
# 时间格式转换
#cmp_detail['period_day'] /= 86400
# 当前客户账期使用程度算法
if (receive_count != 0) and (appoint_tolerance != 0):
cmp_detail['period_user_level'] = period_day / receive_count / appoint_tolerance
......@@ -344,9 +353,61 @@ class PiErp:
# 交货总金额
cmp_detail['delivery_amount'] = return_amount + unload_amount
# 加权计算
delay_avg_day = cmp_detail['delay_avg_day']
month_weight[fmt]['weight_A'] += delay_avg_day * receive_amount
month_weight[fmt]['weight_B'] += appoint_tolerance * receive_amount
month_weight[fmt]['total_amount'] += receive_amount
except:
MsgHandler.send_dd_msg('【风控报警】:%s' % traceback.print_exc())
# 当月加权计算
for fmt in month_weight:
period_weight_A = 0
period_weight_B = 0
period_platform_level = 0
total_amount = month_weight[fmt]['total_amount']
weight_A = month_weight[fmt]['weight_A']
weight_B = month_weight[fmt]['weight_B']
if total_amount != 0:
period_weight_A = weight_A / total_amount
period_weight_B = weight_B / total_amount
if period_weight_B != 0:
period_platform_level = period_weight_A / period_weight_B
month_weight[fmt]['period_weight_A'] = period_weight_A
month_weight[fmt]['period_weight_B'] = period_weight_B
month_weight[fmt]['period_platform_level'] = period_platform_level
for cmp_code in cmp_dt:
try:
fmt_dt = cmp_dt[cmp_code]
for fmt in fmt_dt:
cmp_detail = fmt_dt[fmt]
period_use_times_single = 0
period_user_level = cmp_detail['period_user_level']
period_platform_level = month_weight[fmt]['period_platform_level']
period_weight_A = month_weight[fmt]['period_weight_A']
period_weight_B = month_weight[fmt]['period_weight_B']
total_amount = month_weight[fmt]['total_amount']
weight_A = month_weight[fmt]['weight_A']
weight_B = month_weight[fmt]['weight_B']
if period_platform_level != 0:
period_use_times_single = period_user_level / period_platform_level
cmp_detail['period_use_times_single'] = period_use_times_single
except:
MsgHandler.send_dd_msg('【风控报警】:%s' % traceback.print_exc())
# 写入数据
self.ts_erp.load_erp_month(cmp_dt, table, credit)
......@@ -379,7 +440,6 @@ class PiErp:
total_use_level = 0
user_level = 0
period_platform_level = 0
period_use_times_single = 0
period_use_times_six = 0
period_user_level_six = 0
......@@ -409,7 +469,6 @@ class PiErp:
delay_avg_day = sd['delay_avg_day']
if period_platform_level != 0:
period_use_times_single = use_level / period_platform_level
period_user_level_six = delay_avg_day / total_tolerance if total_tolerance != 0 else 0
period_use_times_six = period_user_level_six / period_platform_level
......@@ -419,7 +478,6 @@ class PiErp:
'period_weight_A' : period_weight_A,
'period_weight_B' : period_weight_B,
'period_platform_level' : period_platform_level,
'period_use_times_single' : period_use_times_single,
'period_use_times_six' : period_use_times_six
}
......
......@@ -151,5 +151,18 @@ class PiGoods:
db=ConnList.Dashboard(),
cal_time=False)
"""
联营SKU有效总数推送
"""
@staticmethod
def effect_sku():
ts = TsGoods('', data='')
total = ts.trans_all_up()
ts.trans_open_falcon(total=total)
from pipeline.Base import Base
class PiLc(Base):
def __init__(self):
super().__init__('')
def pipeline_lc_order(self):
MAX_REPEAT = 10
SLEEP_TIME = 3
today_ts = self.DateHandler.date_time(0)
yeday_ts = self.DateHandler.date_time(1)
now_ts = self.DateHandler.now_datetime()
now_day = self.DateHandler.now_date(1, 1)
repeat = 0
data = []
pt = 1
self.result['order_count'] = 0
self.result['paid_count'] = 0
self.result['total_price'] = 0
while(repeat < MAX_REPEAT):
try:
url = "http://localhost:8888/elec/szlc_zzl?pt=%d" % pt
r = self.requests.get(url, timeout=5)
res = r.json()
print(res)
self.time.sleep(SLEEP_TIME)
if 'message' in res:
repeat += 1
mes = res['message']
if mes == 'request time out':
continue
elif mes == 'search no result':
break
if 'data' in res and 'hasNext' in res and 'pageToken' in res:
pt = int(res['pageToken'])
hasNext = res['hasNext']
if hasNext is not True:
break
res_data = res['data']
for row in res_data:
orderDT = self.DateHandler.str_to_unix(row['orderDT'])
totalPrice = row['totalPrice']
paymentStatus = 1 if row['paymentStatus'] == '已付' else 0
if orderDT > today_ts:
continue
if yeday_ts <= orderDT <= today_ts:
data.append(row)
self.result['order_count'] += 1
self.result['paid_count'] += paymentStatus
self.result['total_price'] += totalPrice
elif orderDT < yeday_ts:
repeat = MAX_REPEAT
break
except:
raise Exception
if self.result['order_count'] > 0:
self.result['avg_prices'] = self.result['total_price'] / self.result['order_count']
self.result['pay_rate'] = str(round(self.result['paid_count'] / self.result['order_count'], 2) * 100) + '%'
else:
self.result['avg_prices'] = 0
self.result['pay_rate'] = '0%'
print(self.result)
db = self.ConnList.Dashboard()
sql = "INSERT INTO lie_lc_quota (order_count,avg_prices,pay_rate,total_prices,cal_ts) \
VALUES(%d,%f,'%s',%f,'%s')" % (self.result['order_count'],self.result['avg_prices'],
self.result['pay_rate'],self.result['total_price'],now_day)
self.DBHandler.insert(db, sql)
bigdata_db = self.ConnList.Bigdata()
print(len(data))
for row in data:
orderID = row['orderID']
clientName = row['clientName']
clientID = row['clientID']
orderTS = row['orderTS']
totalPrice = row['totalPrice']
paymentStatus = row['paymentStatus']
couponsStatus = ''
orderStatus = row['orderStatus']
sql = "INSERT INTO lie_szlc_order_info (orderID,clientName,clientID,orderTS,totalPrice,paymentStatus,couponsStatus,orderStatus,ctime,mtime) \
VALUES ('%s','%s','%s','%s','%s','%s','%s','%s',%d,%d)" % (orderID,clientName,clientID,orderTS,totalPrice,paymentStatus,couponsStatus,orderStatus,now_ts,now_ts)
self.DBHandler.insert(bigdata_db,sql)
import subprocess
import time
import re
import traceback
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
from config.conn_list import ConnList
......@@ -24,7 +24,8 @@ class PiLxLog:
'ic': '((.*)/ic/(.*))',
'info': '((.*)news.html(.*)|(.*)/article/(.*))'
}
self.block_ip = ['39.156.65', '47.92.125', '14.215.176']
self.block_ip = ['39.156.65', '47.92.125', '14.215.176', '121.40.171.123', '47.111.224.39']
self.white_ip = ['123.125.71', '220.181.108', '111.206.221', '111.206.198', '124.166.232']
def merge_file(self):
......@@ -53,6 +54,7 @@ class PiLxLog:
subprocess.getoutput(cmd)
def merge_ly_file(self):
print('merge_ly_file')
# HDFS数据写入本地
day = self.day
......@@ -79,6 +81,8 @@ class PiLxLog:
subprocess.getoutput(cmd)
def get_ly_hdfs_data(self):
print('get_ly_hdfs_data')
# 读取HDFS数据
day = self.day
hdfs_file = '/ly_log/ly_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + "/sum"
......@@ -93,6 +97,7 @@ class PiLxLog:
return hdfs_data
def wash_data(self, data):
print(len(data))
f = open('/data3/hdfs_data/spider_url/url.txt', 'w')
for row in data:
try:
......@@ -109,6 +114,13 @@ class PiLxLog:
if block_ip in ip:
continue
# 白名单IP
# if ip not in self.white_ip:
# continue
if '.php' in url or '.asp' in url:
continue
time_str = self.time_analyzing(time_base)
time_ts = DateHandler.str_to_unix(time_str)
page = self.url_analyzing(url)
......@@ -124,14 +136,14 @@ class PiLxLog:
else:
self.status_count[status] += 1
if status >= 400:
if status == 404:
self.insert_seo_url(url, status, time_ts, 'lie_seo_spider_url')
else:
line = url + "|" + str(time_ts) + "|" + ip
f.write(line + "\n")
f.flush()
except:
pass
traceback.print_exc()
self.insert_seo_status(self.status_count, 'lie_seo_spider_status')
self.insert_seo_daily(self.page_count)
......@@ -140,6 +152,8 @@ class PiLxLog:
f.close()
def wash_ly_data(self, data):
print('wash')
for row in data:
try:
......@@ -162,7 +176,7 @@ class PiLxLog:
self.insert_seo_url(url, status, time_ts, 'lie_seo_ly_url')
except:
pass
traceback.print_exc()
self.insert_seo_status(self.status_count, 'lie_seo_ly_status')
......@@ -224,23 +238,35 @@ class PiLxLog:
self.wash_ly_data(data)
def insert_seo_url(self, url, status, time_ts, table):
sql = "INSERT INTO %s (url,status,cal_ts,spider_time) VALUES (\'%s\',\'%s\',\'%s\',%d)" % (table,url,status,self.day,time_ts)
DBHandler.insert(self.db, sql)
try:
sql = "INSERT INTO %s (url,status,cal_ts,spider_time) VALUES (\'%s\',\'%s\',\'%s\',%d)" % (table,url,status,self.day,time_ts)
DBHandler.insert(self.db, sql)
except:
traceback.print_exc()
def insert_seo_status(self, status_count, table):
for status in status_count:
count = status_count[status]
sql = "INSERT INTO %s (status,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (table, status,count,self.day)
DBHandler.insert(self.db, sql)
try:
for status in status_count:
count = status_count[status]
sql = "INSERT INTO %s (status,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (table, status,count,self.day)
DBHandler.insert(self.db, sql)
except:
traceback.print_exc()
def insert_seo_daily(self, page_count):
for page in page_count:
count = page_count[page]
sql = "INSERT INTO lie_seo_spider_daily (page,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (page,count,self.day)
DBHandler.insert(self.db, sql)
try:
for page in page_count:
count = page_count[page]
sql = "INSERT INTO lie_seo_spider_daily (page,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (page,count,self.day)
DBHandler.insert(self.db, sql)
except:
traceback.print_exc()
def insert_seo_ip_section(self, ip_section):
for ip in ip_section:
count = ip_section[ip]
sql = "INSERT INTO lie_seo_spider_ip_section (ip_section,spider_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
try:
for ip in ip_section:
count = ip_section[ip]
sql = "INSERT INTO lie_seo_spider_ip_section (ip_section,spider_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
except:
traceback.print_exc()
......@@ -70,57 +70,72 @@ class PiLxLogUrl:
return url.split('?')[0]
def read_data(self):
sql = "SELECT id,url,ip,is_include FROM lie_seo_bd_record"
data = DBHandler.read(self.db, sql)
return data
try:
sql = "SELECT id,url,ip,is_include FROM lie_seo_bd_record"
data = DBHandler.read(self.db, sql)
return data
except:
pass
def include_baidu(self, data):
p1 = mp.Pool(processes=10)
for row in data:
tid = row[0]
url = row[1]
url = "http://127.0.0.1:12588/seocheck?target=www.ichunt.com" + url
p1.apply_async(deal_data, (url, tid, 0.1, 'lie_seo_bd_record'))
p1.close()
p1.join()
try:
p1 = mp.Pool(processes=10)
for row in data:
tid = row[0]
url = row[1]
url = "http://127.0.0.1:12588/seocheck?target=www.ichunt.com" + url
p1.apply_async(deal_data, (url, tid, 0.1, 'lie_seo_bd_record'))
p1.close()
p1.join()
except:
pass
# 收录分析
def include_analyzing(self, data):
for row in data:
ip = row[2]
is_include = row[3]
if (re.match('([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)', ip, re.M | re.I)) and (is_include == 1):
ip_arr = ip.split('.')
new_ip = '%s.%s.%s' % (ip_arr[0], ip_arr[1], ip_arr[2])
if new_ip not in self.ip_section:
self.ip_section[new_ip] = 1
else:
self.ip_section[new_ip] += 1
try:
for row in data:
ip = row[2]
is_include = row[3]
if (re.match('([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)', ip, re.M | re.I)) and (is_include == 1):
ip_arr = ip.split('.')
new_ip = '%s.%s.%s' % (ip_arr[0], ip_arr[1], ip_arr[2])
if new_ip not in self.ip_section:
self.ip_section[new_ip] = 1
else:
self.ip_section[new_ip] += 1
except:
pass
# 指定页面收录分析
def include_appoint_analyzing(self):
sql = "SELECT id,url FROM lie_seo_appoint_record"
result = DBHandler.read(self.db, sql)
for row in result:
tid = row[0]
url = "http://127.0.0.1:12588/seocheck?target=" + row[1]
deal_data(url, tid, 0.1, 'lie_seo_appoint_record')
try:
sql = "SELECT id,url FROM lie_seo_appoint_record"
result = DBHandler.read(self.db, sql)
for row in result:
tid = row[0]
url = "http://127.0.0.1:12588/seocheck?target=" + row[1]
deal_data(url, tid, 0.1, 'lie_seo_appoint_record')
except:
pass
# 收录插入
def insert_seo_ip_section(self):
for ip in self.ip_section:
count = self.ip_section[ip]
sql = "INSERT INTO lie_seo_include_ip_section (ip_section,include_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
try:
for ip in self.ip_section:
count = self.ip_section[ip]
sql = "INSERT INTO lie_seo_include_ip_section (ip_section,include_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
except:
pass
def pipeline_lx_log_url(self):
self.read_file()
data = self.read_data()
self.include_baidu(data)
self.include_analyzing(data)
self.include_appoint_analyzing()
# self.include_baidu(data)
# self.include_analyzing(data)
# self.include_appoint_analyzing()
self.insert_seo_ip_section()
self.del_file()
......
......@@ -14,48 +14,61 @@ class PiWeekly:
@staticmethod
def pipeline_week_hot_goods():
d = time.strftime("%w", time.localtime())
# 周一
if int(d) == 1:
goods = {}
r = ConnList.WriteRedis()
# r = ConnList.LocalRedis()
r.delete('weekly_hot_goods')
# 时间
monday = DateHandler.date_time(7)
sunday = DateHandler.date_time(0)
try:
d = time.strftime("%w", time.localtime())
# 周一
if int(d) == 1:
goods = {}
r = ConnList.WriteRedis()
# r = ConnList.LocalRedis()
# 时间
monday = DateHandler.date_time(7)
sunday = DateHandler.date_time(0)
# monday = 1583078400
# sunday = 1583683200
# 读取db
condition = {'start_time': monday, 'end_time': sunday,
'condition': ['u.is_type = 0', 'o.order_goods_type = 2', 'i.goods_id != 0',
'o.status > 2']}
order = ExOrder('').order_items(condition)
if len(order) >= 18:
r.delete('weekly_hot_goods')
for row in order:
goods_id = row['goods_id']
goods_number = row['goods_number']
if goods_id not in goods:
goods[goods_id] = goods_number
else:
goods[goods_id] += goods_number
# 读取db
condition = {'start_time': monday, 'end_time': sunday, 'condition': ['u.is_type = 0', 'o.order_goods_type = 2', 'i.goods_id != 0', 'o.status > 2']}
order = ExOrder('').order_items(condition)
res = sorted(goods.items(), key=lambda goods: goods[1], reverse=True)
for row in order:
goods_id = row['goods_id']
goods_number = row['goods_number']
if goods_id not in goods:
goods[goods_id] = goods_number
else:
goods[goods_id] += goods_number
print(res)
res = sorted(goods.items(), key=lambda goods: goods[1], reverse=True)
# 提取前18个数据
for i in range(0, 18):
goods_id = res[i][0]
goods_count = res[i][1]
print(goods_id, goods_count)
r.hset("weekly_hot_goods", goods_id, goods_count)
except:
pass
# 提取前18个数据
for i in range(0, 18):
goods_id = res[i][0]
goods_count = res[i][1]
r.hset("weekly_hot_goods", goods_id, goods_count)
# 每周一二级热卖物料
@staticmethod
def pipeline_week_classify_hot_goods():
d = time.strftime("%w", time.localtime())
# 周一
if int(d) == 1:
r = ConnList.WriteRedis()
r.delete('first_classify_goods')
r.delete('second_classify_goods')
......@@ -135,7 +148,7 @@ class PiWeekly:
break
r.hset("first_classify_goods", class_id, lt)
@staticmethod
def pipeline_lx_brand():
d = time.strftime("%w", time.localtime())
......@@ -185,18 +198,14 @@ class PiWeekly:
@staticmethod
def pipeline_keyword_match():
start_time = DateHandler.today_between_months(1, '%Y-%m-%d')
end_time = DateHandler.today_between_months(0, '%Y-%m-%d')
start_time = DateHandler.str_to_unix(DateHandler.today_between_months(1, '%Y-%m-%d'), "%Y-%m-%d")
end_time = DateHandler.str_to_unix(DateHandler.today_between_months(1, '%Y-%m-%d'), "%Y-%m-%d")
db = ConnList.Order()
start_time = 1575129600
end_time = 1575216000
# 获取无结果词
has_result = DBHandler.scroll_read(url="http://so12.ichunt.com/search/searchLog/index",
body={"create_time/range": "%d,%d" % (start_time, end_time),
"flag/condition": "1", "p": 1,
"flag/condition": "2", "p": 1,
"offset": "1000", "is_scroll": "1"},
key='list')
......@@ -272,7 +281,7 @@ class PiWeekly:
index += 1
print(index)
time.sleep(0.01)
time.sleep(0.001)
wr_db = ConnList.Dashboard()
......@@ -289,3 +298,65 @@ class PiWeekly:
DBHandler.insert(wr_db, sql)
print(len(cmp_kl))
def pipeline_no_result_keyword_weekly(self):
d = time.strftime("%w", time.localtime())
# 周一
if int(d) == 2:
rd = {}
# start_time = DateHandler.date_time(7)
# end_time = DateHandler.date_time(0)
# now_time = DateHandler.now_datetime()
start_time = 1573401600
end_time = 1574006400
now_time = 1573401600
db = ConnList.Dashboard()
# 获取无结果词
no_result = DBHandler.scroll_read(url="http://so12.ichunt.com/search/searchLog/index",
body={"create_time/range": "%d,%d" % (start_time, end_time),
"flag/condition": "2", "p": 1,
"offset": "1000", "is_scroll": "1"},
key='list')
for row in no_result:
keyword = row['keyword']
ip = row['ip']
if keyword not in rd:
rd[keyword] = {'count': 1, 'ip': [ip]}
else:
rd[keyword]['count'] += 1
if ip not in rd[keyword]['ip']:
rd[keyword]['ip'].append(ip)
print(len(rd))
index = 0
for keyword in rd:
index += 1
print(index)
self.deal_no_result(keyword, rd[keyword]['count'], len(rd[keyword]['ip']), now_time, db)
def deal_no_result(self, keyword, count, size, now_time, db):
i = 0
while i < 3:
try:
res = requests.get("http://127.0.0.1:8888/supplier/hqew?kw=" + keyword, timeout=5).json()
if 'data' in res:
data = res['data']
brand = data[0]['brand'] if len(data) > 0 else ''
else:
brand = ''
sql = "INSERT INTO lie_no_result_weekly (keyword,brand,search_count,search_user,cal_ts) VALUES ('%s','%s',%d,%d,%d)" \
% (keyword, brand, count, size, now_time)
DBHandler.insert(db, sql)
return
except requests.exceptions.RequestException:
i += 1
......@@ -39,7 +39,7 @@ class TsCrm:
if (len(order) > 0):
self.dd[outter_uid]['is_order'] = 2
else:
self.dd[outter_uid]['is_order'] = 1
self.dd[outter_uid]['is_order'] = 1
except:
self.dd[outter_uid]['is_order'] = 0
......@@ -133,6 +133,10 @@ class TsCrm:
self.update_data(dd)
else:
self.insert_data(dd)
if (dd['is_order'] == 2):
self.update_isValue(dd['user_id'])
except:
traceback.print_exc()
......@@ -147,6 +151,10 @@ class TsCrm:
(dd['is_order'], dd['last_order_time'], dd['order_num'], dd['model_num'], dd['order_amount'], dd['paid_amount'], dd['contact'], dd['contact_info'], dd['outter_uid'])
DBHandler.update(self.wr_db, sql)
def update_isValue(self, user_id):
sql = "UPDATE lie_user SET is_value=1 WHERE user_id=%d" % user_id
DBHandler.update(self.wr_db, sql)
......
......@@ -346,7 +346,6 @@ class TsERP:
period_weight_A = data['period_weight_A']
period_weight_B = data['period_weight_B']
period_platform_level = data['period_platform_level']
period_use_times_single = data['period_use_times_single']
period_use_times_six = data['period_use_times_six']
update_time = DateHandler.now_datetime()
......@@ -354,7 +353,6 @@ class TsERP:
period_weight_A = %.2f, \
period_weight_B = %.2f, \
period_platform_level = %.2f, \
period_use_times_single = %.2f, \
period_use_times_six = %.2f,\
update_time = %d \
WHERE erp_company_code = \'%s\' AND month = %d" % \
......@@ -363,7 +361,6 @@ class TsERP:
period_weight_A,
period_weight_B,
period_platform_level,
period_use_times_single,
period_use_times_six,
update_time,
cmp_code,
......
......@@ -4,6 +4,8 @@ from load.load_mysql import LoadMysql
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from utils.excel_handler import ExcelHandler
from utils.msg_handler import MsgHandler
from config.supplier import supplier
import json
import requests
import time
......@@ -342,6 +344,52 @@ class TsGoods(TsBase):
else:
rd['brand'][brand]['pur_cost'] += cost
def trans_all_up(self):
total = 0
es_url = "http://so12.ichunt.com/search/es/searchsku"
for sku in supplier:
sku_id = supplier[sku]
body = {"supplier_id": sku_id, "goods_status/condition": 1, "status/condition": 1}
r = requests.post(es_url, data=body)
error_code = r.json()['error_code']
if error_code != 1:
total += int(r.json()['data']['total'])
print(total)
return total
def trans_open_falcon(self, total):
ts = DateHandler.now_datetime()
db = ConnList.Dashboard()
payload = [
{
"endpoint": "bigdata-master/47.106.81.102",
"metric": "spider-sku",
"timestamp": ts,
"step": 3600,
"value": round(float(total / 10000), 2),
"counterType": "GAUGE",
"tags": "type=sku,msg=effective_sku_acount"
}
]
print(payload)
r = requests.post("http://172.18.137.35:1988/v1/push", data=json.dumps(payload))
sql = "INSERT INTO lie_effect_sku (effect_sku,update_time) VALUES (%d,%d)" % (total,DateHandler.now_datetime())
DBHandler.insert(db, sql)
# msg_api = "https://oapi.dingtalk.com/robot/send?access_token=c8033ce035f6d30331c49e7d980ac16de699c65c35fc3e765926904c5329d961"
# MsgHandler.send_point_dd_msg("【有效SKU数】: %d" % total, msg_api)
print(r.text)
......
......@@ -17,8 +17,10 @@ class TsUser(TsBase):
# Excel标题、内容
title = ['账号', '注册时间']
content = ['mobile', 'create_time']
# 结果字典
rd = []
# 遍历结果集
for row in self.data:
mobile = row['mobile']
......@@ -46,6 +48,7 @@ class TsUser(TsBase):
if index == len(results):
rd.append({'mobile': mobile if mobile != '' else email,
'create_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(create_time))})
# 生成Excel
ExcelHandler.write_to_excel(title, content, rd, "result", result_type=2)
......@@ -110,6 +113,7 @@ class TsUser(TsBase):
"""
def trans_search_keyword(self, test_ip, pf='NULL'):
fliter_list = ['`','\'','</A>','<A']
key_d = {}
for i in self.data:
r_ip = i['ip']
......@@ -118,13 +122,13 @@ class TsUser(TsBase):
i['keyword'] = i['keyword'].rstrip()
i['keyword'] = i['keyword'].upper()
if pf != 'NULL':
if pf == i['platform'] and '</A>' not in i['keyword'] and '<A' not in i['keyword']:
if pf == i['platform'] and not any(ft in i['keyword'] for ft in fliter_list):
if i['keyword'] not in key_d.keys():
key_d[i['keyword']] = 1
else:
key_d[i['keyword']] += 1
else:
if '</A>' not in i['keyword'] and '<A' not in i['keyword']:
if '</A>' not in i['keyword'] and not any(ft in i['keyword'] for ft in fliter_list):
if i['keyword'] not in key_d.keys():
key_d[i['keyword']] = 1
else:
......
No preview for this file type
......@@ -93,6 +93,10 @@ class DateHandler:
def today_between_months(months, fmt="%Y-%m-%d"):
return (datetime.date.today() - relativedelta(months=months)).strftime(fmt)
@staticmethod
def today_between_day(day, fmt="%Y%m%d"):
return (datetime.date.today() - relativedelta(days=day)).strftime(fmt)
"""
返回指定时间之前之后的日期
months: +前推 -后推
......
......@@ -24,7 +24,7 @@ class DBHandler:
results = cursor.fetchall()
except:
db.rollback()
print(traceback.print_exc())
# print(traceback.print_exc())
return results
......@@ -42,7 +42,7 @@ class DBHandler:
except:
db.rollback()
traceback.print_exc()
print(sql)
# print(sql)
"""
MySQL插入数据
......@@ -58,7 +58,7 @@ class DBHandler:
except:
db.rollback()
traceback.print_exc()
# print(sql)
print(sql)
"""
MySQL删除数据
......@@ -74,7 +74,7 @@ class DBHandler:
except:
db.rollback()
traceback.print_exc()
print(sql)
# print(sql)
"""
读取HDFS数据
......
......@@ -45,19 +45,17 @@ class ExcelHandler:
wb.save(file_name + '.xls')
"""
function:
读出*.xlsx中的每一条记录,把它保存在data_dic中返回
Param:
records: 要保存的,一个包含每一条记录的list
save_excel_name: 保存为的文件名
head_row_stu_arrive_star:
Return:
data_dic: 返回的记录的dict
"""
function:
读出*.xlsx中的每一条记录,把它保存在data_dic中返回
Param:
records: 要保存的,一个包含每一条记录的list
save_excel_name: 保存为的文件名
head_row_stu_arrive_star:
Return:
data_dic: 返回的记录的dict
"""
@staticmethod
def write_to_excel_with_openpyxl(title, content, result, file_name):
# 新建一个workbook
wb = Workbook()
# 新建一个excelWriter
......
......@@ -29,6 +29,17 @@ class MsgHandler:
res = requests.post(robot_api + robot, json=data)
print(res)
@staticmethod
def send_point_dd_msg(msg, robot_api):
data = {
"msgtype": "text",
"text": {
"content": msg
}
}
res = requests.post(robot_api, json=data)
print(res)
"""
调用消息接口发送邮件
......
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment