Commit 5b397aa3 by lzzzzl

Initial commit

parent d4a0e717
Showing with 3486 additions and 34 deletions
from extract.ex_ly_sku import ExLySku
from utils.msg_handler import MsgHandler
import traceback
from extract.ex_sku_expose import ExSkuExpose
from extract.ex_ickey import ExIckey
from extract.ex_base import Base
from load.load_hbase import LoadHbase
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from pipeline.pi_goods import PiGoods
from pipeline.pi_erp import PiErp
from config.conn_list import ConnList
import sys
......@@ -11,15 +18,59 @@ if __name__ == '__main__':
del_sku: 删除MongoDB里的原始数据
"""
if len(sys.argv) > 1:
# sku价格采集
if sys.argv[1] == 'ex_sku':
try:
ExLySku.ex_sku()
except:
MsgHandler.send_dd_msg("【联营SKU价格采集任务】 失败\n" +
"失败信息:" + str(traceback.print_exc()))
elif sys.argv[1] == 'del_sku':
try:
ExLySku.del_sku()
except:
MsgHandler.send_dd_msg("【Mongo数据删除任务】 失败\n" +
"失败信息:" + str(traceback.print_exc()))
elif sys.argv[1] == 'put_hdfs':
ExLySku.put_hdfs()
elif sys.argv[1] == 'load_hbase':
ExLySku.load_hbase()
elif sys.argv[1] == 'del_hdfs':
ExLySku.del_hdfs()
# sku曝光
elif sys.argv[1] == 'sku_expose_collect':
ExSkuExpose.collect_rq()
elif sys.argv[1] == 'merge_file':
ExSkuExpose.merge_file()
# ickey自营采集
elif sys.argv[1] == 'ickey_zy':
ickey = ExIckey('ickey')
ickey.ex_ickey_goods()
# 云汉指数
elif sys.argv[1] == 'ickey_index':
ickey = ExIckey('ickey')
# 采集
ickey.ex_ickey_index()
# 上传数据
local_file = '/data3/hdfs_data/ickey_index/' + 'ickey_' + \
DateHandler.now_date(days=0, d_type=2) + '.txt'
hdfs_file = '/ickey_index/ickey_' + DateHandler.now_date(days=0, d_type=2) + '.txt'
DBHandler.hdfs_upload('/ickey_index/', local_file)
Base.rm_local_file(local_file)
# HDFS数据写入HBASE
row = "%s,%s,%s,%s,%s,%s" % ("cf1:searchTrendAnalysis", "cf1:searchAreaTrendAnalysis",
"cf1:industryType", "cf1:search_num", "cf1:trade_num", "cf1:buyer_num")
msg = '【云汉指数采集】写入HBASE完成'
LoadHbase.cmd_load(row, "sku:ickey_index", hdfs_file, msg, '|')
# 云汉指数备货
elif sys.argv[1] == 'ickey_stock_find':
PiGoods.pipeline_zy_stock()
# ERP获取客户订单
elif sys.argv[1] == 'erp_company':
PiErp().pipeline_erp_company(ConnList.Credit(), ConnList.CreditSource())
# ERP客户订单info
elif sys.argv[1] == 'erp_info':
PiErp().pipeline_erp_info()
# ERP客户订单month
elif sys.argv[1] == 'erp_month':
PiErp().pipeline_erp_month(False, "WHERE is_credit = 1 ORDER BY business_time", "lie_basic_month", ConnList.Credit(), ConnList.CreditSource())
PiErp().pipeline_erp_weight("lie_basic_month", ConnList.Credit())
# ERP客户订单容差month
elif sys.argv[1] == 'erp_month_tolerance':
PiErp().pipeline_erp_month(True, "WHERE is_credit = 1 ORDER BY business_time", "lie_basic_month_tolerance", ConnList.CreditLx(), ConnList.CreditSource())
PiErp().pipeline_erp_weight("lie_basic_month_tolerance", ConnList.CreditLx())
# ERP更新CMP_CODE
elif sys.argv[1] == 'erp_code':
PiErp().pipeline_erp_CmpCode()
import happybase
conn = happybase.Connection("localhost", 9090)
print(conn.tables())
from pipeline.pi_purchase import PiPurchase
from pipeline.pi_email import PiEmail
from pipeline.pi_search import PiSearch
from pipeline.pi_goods import PiGoods
from pipeline.pi_daily import PiDaily
from extract.ex_sku_expose import ExSkuExpose
import sys
# 邮件发送,参数自定
if __name__ == '__main__':
"""
zm_down: 每日专卖下架量
safe_stock: 库存预警
search_no_r: 搜索无结果
sku_expose: 搜索曝光
operate_daily: 运营日报
"""
try:
if len(sys.argv) > 1:
if sys.argv[1] == 'core_daily': # 核心日报
PiDaily.pipeline_core_daily()
if sys.argv[1] == 'zm_down':
PiEmail.pipeline_zm_down()
if sys.argv[1] == 'zm_warn':
PiGoods.pipeline_zm_warn()
if sys.argv[1] == 'safe_stock':
PiPurchase.pipeline_safe_stock()
if sys.argv[1] == 'search_no_r':
PiSearch.search_no_result()
if sys.argv[1] == 'sku_expose':
# 获取HDFS数据
hdfs_data = ExSkuExpose.get_hdfs_data()
# 计算曝光汇总
ExSkuExpose.cal_sku_expose(hdfs_data)
# ExSkuExpose.cal_sku_expose_detail(hdfs_data)
# 立创曝光
ExSkuExpose.cal_lc_expose(hdfs_data)
if sys.argv[1] == 'zyly_match':
PiGoods.pipeline_zylxly_goods()
if sys.argv[1] == 'operate_daily':
PiDaily.pipeline_operate_daily()
if sys.argv[1] == 'operate_weekly':
PiDaily.pipeline_operate_weekly()
except:
pass
from pipeline.pi_order import PiOrder
from pipeline.pi_user import PiUser
from pipeline.pi_rank import PiRank
from config.conn_list import ConnList
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from utils.excel_handler import ExcelHandler
from extract.ex_erp import ExERP
from pipeline.pi_rank import PiRank
from pipeline.pi_erp import PiErp
from pipeline.pi_lx_log import PiLxLog
import math
import requests
import time
import json
from translate.ts_erp import TsERP
from pipeline.pi_weekly import PiWeekly
def main():
def test():
PiOrder.pipeline_order_items()
rd = []
db = ConnList.CreditSource()
# pi_user = PiUser('用户流')
# pi_user.pipeline_user_without_login()
pass
sql = "SELECT erp_company_code,poolfund_no,poolfund_source_no,entru_no,order_amount,business_time,\
deadline_day,receive_time,return_amount,return_time,is_settle,delay_day,delay_amount,\
is_credit,gross_profit,poolfund_type,source_type\
FROM lie_basic_detail"
result = DBHandler.read(db, sql)
for row in result:
rd.append({
'company': row[0],
'poolfund_no': row[1],
'poolfund_source_no': row[2],
'entru_no': row[3],
'order_amount': row[4],
'business_time': DateHandler.unix_to_date(row[5], fmt="%Y-%m-%d"),
'deadline_day': row[6],
'receive_time': DateHandler.unix_to_date(row[7], fmt="%Y-%m-%d"),
'return_amount': row[8],
'return_time': DateHandler.unix_to_date(row[9], fmt="%Y-%m-%d"),
'is_settle': '是' if row[10] == 1 else '否',
'delay_day': row[11],
'delay_amount': row[12],
'is_credit': '有' if row[13] == 1 else '无',
'gross_profit': row[14],
'poolfund_type': row[15],
'source_type': '供应链' if row[16] == 1 else '科技'
})
# Excel标题
title = ['公司', '资金池编号', '资金池来源编码', '入仓号', '下单金额', '业务时间', '订单期限(账期天数)', '应收日期(业务日期+期限)', '回款金额', '回款日期', '是否结清',
'逾期支付天数', '逾期支付金额', '信用', '毛利', '资金池类型', '来源']
# Excel内容
content = ['company', 'poolfund_no', 'poolfund_source_no', 'entru_no', 'order_amount', 'business_time',
'deadline_day', 'receive_time',
'return_amount', 'return_time',
'is_settle', 'delay_day', 'delay_amount',
'is_credit', 'gross_profit', 'poolfund_type', 'source_type']
ExcelHandler.write_to_excel(title, content, rd, "result", result_type=2)
if __name__ == '__main__':
main()
test()
from extract.ex_order import ExOrder
from translate.ts_order import TsOrder
def ac_cal():
# 时间
# start_0114 = 1547395200
start_0125 = 1548432000
# start_1217 = 1544976000
# start_1228 = 1546012800
start_0101 = 1546272000
verical_id = 5
arror_id = 10
# 汇总
verical = {'start_time': start_0101, 'end_time': start_0125,
'condition': ['i.supplier_id = ' + str(verical_id),
'o.order_pay_type != 3',
'o.order_source not like \'%pf=-1%\'']}
arror = {'start_time': start_0101, 'end_time': start_0125,
'condition': ['i.supplier_id = ' + str(arror_id),
'o.order_pay_type != 3',
'o.order_source not like \'%pf=-1%\'']}
ex_order = ExOrder('order detail')
ts_verical = TsOrder('ts', ex_order.order_items(verical)).trans_order_view(6.78)
ts_paid_verical = TsOrder('ts', ex_order.order_paid_items(verical)).trans_order_view(6.78)
ts_arror = TsOrder('ts', ex_order.order_items(arror)).trans_order_view(6.78)
ts_paid_arror = TsOrder('ts', ex_order.order_paid_items(arror)).trans_order_view(6.78)
print('-------------------------------------------')
print('Verical')
print('下单人数:', len(ts_verical['user']), '下单金额:', ts_verical['amount'])
print('付款人数:', len(ts_paid_verical['user']), '付款金额:', ts_paid_verical['amount'])
print('Arror')
print('下单人数:', len(ts_arror['user']), '下单金额:', ts_arror['amount'])
print('付款人数:', len(ts_paid_arror['user']), '付款金额:', ts_paid_arror['amount'])
print('汇总')
print('下单人数:', len(list(set(ts_verical['user'] + ts_arror['user']))),
'下单金额:', ts_verical['amount'] + ts_arror['amount'])
print('付款人数:', len(list(set(ts_paid_verical['user'] + ts_paid_arror['user']))),
'付款金额:', ts_paid_verical['amount'] + ts_paid_arror['amount'])
# 区分渠道汇总
sum_canal = {'start_time': start_0101, 'end_time': start_0125,
'condition': ['i.supplier_id in (5, 10)',
'o.order_pay_type != 3',
'o.order_source not like \'%pf=-1%\'']}
ts_sum_canal = TsOrder('', ex_order.order_items(sum_canal)).trans_order_tag(6.78)
ts_sum_paid_canal = TsOrder('', ex_order.order_paid_items(sum_canal)).trans_order_tag(6.78)
sem = ts_sum_canal['sem']
sem_paid = ts_sum_paid_canal['sem']
wx = ts_sum_canal['wx']
wx_paid = ts_sum_paid_canal['wx']
wechat = ts_sum_canal['wechat']
wechat_paid = ts_sum_paid_canal['wechat']
qq = ts_sum_canal['qq']
qq_paid = ts_sum_paid_canal['qq']
print('-------------------------------------------')
print('SEM')
print('下单人数:', len(sem['user']), '下单金额:', sem['amount'],
'付款人数:', len(sem_paid['user']), '付款金额', sem_paid['amount'])
print('微信')
print('下单人数:', len(wx['user']), '下单金额:', wx['amount'],
'付款人数:', len(wx_paid['user']), '付款金额', wx_paid['amount'])
print('微信公众号')
print('下单人数:', len(wechat['user']), '下单金额:', wechat['amount'],
'付款人数:', len(wechat_paid['user']), '付款金额', wechat_paid['amount'])
print('QQ')
print('下单人数:', len(qq['user']), '下单金额:', qq['amount'],
'付款人数:', len(qq_paid['user']), '付款金额', qq_paid['amount'])
# 新老用户区分
print('-------------------------------------------')
ts_user_type = TsOrder('', ex_order.order_items(sum_canal)).trans_user_type(6.78, start_0101)
ts_user_paid_type = TsOrder('', ex_order.order_paid_items(sum_canal)).trans_user_type(6.78, start_0101)
print(ts_user_type)
print(ts_user_paid_type)
# 数据Excel
# TsOrder('', ex_order.order_paid_items(sum_canal)).trans_order_people(6.78)
from sklearn import model_selection, preprocessing, linear_model, naive_bayes, metrics, svm
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn import decomposition, ensemble
from sklearn.externals import joblib
from config.conn_list import ConnList
import pandas as pd
import numpy as np
import pymysql
db = ConnList.Dashboard()
model = joblib.load("train_model.m")
tfidf_vect_ngram_chars = joblib.load("tfidf_vect_ngram_chars.m")
test = pd.Series(["0402"])
test_tfidf_ngram_chars = tfidf_vect_ngram_chars.transform(test)
# 读DB
def read(db, sql):
results = {}
cursor = db.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
except:
db.rollback()
return results
# 判断是否索引列
def verify_index(data):
# 分值
point = 0
max_num = 0
total = len(data)
# 遍历数据
try:
for i in data:
# 判断是否正整数
if str(int(i)).isdigit() and int(i) >= 0:
# 递增且递增差值小于100
if (i > max_num) and (max_num - i <= 10):
max_num = i
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断是否数量列
def verify_number(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
for i in data:
# 判断是否正整数
if str(int(i)).isdigit():
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断分类
def verify_classify(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
for i in data:
# 判断分类
sql = "SELECT 1 FROM lie_bom_class_name WHERE class_name like '%%%s%%'" % i
result = read(db,sql)
if len(result) > 0:
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断品牌
def verify_brand(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
for i in data:
brand_name = str(i).split('(')[0].upper()
# 判断分类
sql = "SELECT 1 FROM lie_bom_brand_name WHERE brand_name like '%%%s%%'" % i
result = read(db,sql)
if len(result) > 0:
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断参数
def verify_param(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
test = pd.Series(data)
test_tfidf_ngram_chars = tfidf_vect_ngram_chars.transform(test)
predictions = model.predict(test_tfidf_ngram_chars)
predictions
for pre in predictions:
if pre == 'param':
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断型号
def verify_goods_name(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
test = pd.Series(data)
test_tfidf_ngram_chars = tfidf_vect_ngram_chars.transform(test)
predictions = model.predict(test_tfidf_ngram_chars)
predictions
for pre in predictions:
if pre == 'goods_name':
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
# 判断封装
def verify_encap(data):
# 分值
point = 0
total = len(data)
# 遍历数据
try:
test = pd.Series(data)
test_tfidf_ngram_chars = tfidf_vect_ngram_chars.transform(test)
predictions = model.predict(test_tfidf_ngram_chars)
predictions
for pre in predictions:
if pre == 'encap':
point += 1
except:
pass
# 判断该列是否为索引,占比6成以上即认定为True
if (point / total) >= 0.6:
return True
else:
return False
def main():
test = pd.read_excel('BOM选型标准模板_ICkey2.xlsx')
test.dropna()
test.fillna('NA')
columns = list(test.columns)
for col in columns:
test[col] = test[col].fillna('NA')
col_index = {'索引': -1, '数量': -1, '分类': -1, '品牌': -1, '参数': -1, '型号': -1, '封装': -1}
col_list = []
# 索引列
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_index(test[col]):
col_index['索引'] = i
col_list.append(i)
break
# 数量列
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_number(test[col]):
col_index['数量'] = i
col_list.append(i)
break
# 分类列
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_classify(test[col]):
col_index['分类'] = i
col_list.append(i)
break
# 品牌列
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_brand(test[col]):
col_index['品牌'] = i
col_list.append(i)
break
# 参数列
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_param(test[col]):
col_index['参数'] = i
col_list.append(i)
break
# 型号
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_goods_name(test[col]):
col_index['型号'] = i
col_list.append(i)
break
# 封装
for i in range(0, len(columns)):
col = columns[i]
if i not in col_list and verify_encap(test[col]):
col_index['封装'] = i
col_list.append(i)
break
print(col_list)
if __name__ == '__main__':
main()
No preview for this file type
import pymysql
from config.db import *
import pymongo as pm
import redis
class ConnList:
@staticmethod
def Dashboard():
conf = dashboard_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Order():
conf = order_server
host = str(conf['host'])
user = conf['user']
password = conf['password']
db_name = conf['db_name']
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def WriteOrder():
conf = order_write_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
return pymysql.connect(host, user, password, db_name, charset='utf8')
@staticmethod
def Bigdata():
conf = bigdata_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Wms():
conf = wms_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Zy():
conf = zy_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def lxData():
conf = lxdata_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Behavior():
conf = behavior_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def IcData():
conf = ic_data_server
host = str(conf['host'])
user = conf['user']
password = conf['password']
db_name = conf['db_name']
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def SupData():
conf = supplier_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Activity():
conf = activity_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def LyGoods():
conf = lygoods_server
return redis.Redis(**conf)
@staticmethod
def Offer():
conf = offer_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Credit():
conf = credit_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def CreditSource():
conf = credit_source_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def CreditLx():
conf = credit_lx_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def CreditLocal():
conf = credit_local_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
return pymysql.connect(host, user, password, db_name, charset='utf8')
@staticmethod
def CreditSourceLocal():
conf = credit_source_local_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Chain():
conf = chain_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Local():
conf = local_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def Sz():
conf = sz_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def MongoDB():
conf = mongodb_server
host = str(conf['host'])
return pm.MongoClient(str(conf['host']))
@staticmethod
def SeoMongoDB():
conf = seo_mongodb_server
return pm.MongoClient(str(conf['host']))
@staticmethod
def LocalMongoDB():
conf = local_mongodb_server
return pm.MongoClient(str(conf['host']))
@staticmethod
def LocalRedis():
conf = local_redis
return redis.Redis(**conf)
@staticmethod
def WriteRedis():
conf = write_redis
return redis.Redis(**conf)
@staticmethod
def WriteRedis23():
conf = write_redis_23
return redis.Redis(**conf)
@staticmethod
def LocalTestRedis():
conf = local_test_redis
return redis.Redis(**conf)
@staticmethod
def SzLxData():
conf = sz_lx_data
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def WrCrm():
conf = wr_crm_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def LocalLx():
conf = local_liexin
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def LocalRank():
conf = local_rank
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
@staticmethod
def LxRank():
conf = lx_rank
return pymysql.connect(str(conf['host']), conf['user'], conf['password'],
conf['db_name'], charset='utf8')
return pm.MongoClient(host)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
dashboard_server = {
'host': 'localhost',
'user': 'dashboard',
'password': 'ichunt5Dashboard@',
'db_name': 'dashboard'
}
order_server = {
'host': '172.18.137.22',
'user': 'huntdbslave',
......@@ -8,6 +15,48 @@ order_server = {
'db_name': 'hunt2016'
}
order_write_server = {
'host': '172.18.137.21',
'user': 'huntdbuser',
'password': 'mLssy2@@!!@$#yy',
'db_name': 'hunt2016'
}
bigdata_server = {
'host': '172.18.137.37',
'user': 'bigdata',
'password': 'bdYm2yy2mmyzlmlly',
'db_name': 'bigdata'
}
wms_server = {
'host': '172.18.137.33',
'user': 'LxWmsUserRead',
'password': 'Xu0U2oix3soYmosflmxIiZmyt',
'db_name': 'liexin_wms'
}
lxdata_server = {
'host': '172.18.137.33',
'user': 'LxDDUsedRead',
'password': '0o9u0U2oixoYmosflmxXtZmyt',
'db_name': 'liexin_data'
}
zy_server = {
'host': '172.18.137.33',
'user': 'LxDDUsedRead',
'password': '0o9u0U2oixoYmosflmxXtZmyt',
'db_name': 'liexin_data'
}
behavior_server = {
'host': '172.18.137.21',
'user': 'lxbehaviorread',
'password': 'XwZa920OiMLymsZzytread',
'db_name': 'liexin_behavior'
}
ic_data_server = {
'host': '172.18.137.21',
'user': 'dtuserRead',
......@@ -15,6 +64,163 @@ ic_data_server = {
'db_name': 'icdata'
}
supplier_server = {
'host': '172.18.137.21',
'user': 'SupDbUserR',
'password': 'Supssy2Ryxy',
'db_name': 'liexin_supp'
}
activity_server = {
'host': '172.18.137.21',
'user': 'lxtopic',
'password': 'mXYToic@#@yxIy',
'db_name': 'liexin_topic'
}
local_server = {
'host': 'localhost',
'user': 'root',
'password': '123',
'db_name': 'dashboard'
}
offer_server = {
'host': '192.168.1.232',
'user': 'root',
'password': '123456789',
'db_name': 'offer'
}
sz_server = {
'host': '172.18.137.21',
'user': 'ichunt_szusr',
'password': 'mXYm2##@!!@$#yy',
'db_name': 'ichunt_sz'
}
sz_lx_data = {
'host': '172.18.137.35',
'user': 'LxszUsed',
'password': '0o9u0SDxSflmxXtZmyt',
'db_name': 'liexin_sz_data'
}
mongodb_server = {
'host': 'mongodb://ichunt:huntmon66499@172.18.137.23/ichunt'
}
seo_mongodb_server = {
'host': 'mongodb://SeoUxPxts:hunxP2i1JlLm@172.18.137.23/seo'
}
local_mongodb_server = {
'host': 'mongodb://ichunt:huntmon6699@192.168.1.237/ichunt'
}
lygoods_server = {
'host': '172.18.137.39',
'password': 'icDb29mLy2s',
'port': '6379'
}
wr_crm_server = {
'host': '172.18.137.21',
'user': 'LxCrmUser',
'password': 'xUTmu0XsdUqoZIim2y',
'db_name': 'liexin_crm'
}
credit_server = {
'host': 'fkdb-master.ichunt.cc',
'user': 'Cdimz200o',
'password': 'mLssyDxmsySZmBomy',
'db_name': 'liexin_credit'
}
credit_source_server = {
'host': 'fkdb-master.ichunt.cc',
'user': 'tZzU0serMq',
'password': 'mLssyD2sySZmBo1y',
'db_name': 'liexin_credit_source'
}
credit_lx_server = {
'host': 'fkdb-master.ichunt.cc',
'user': 'PxLxcdp201',
'password': 'Oxnt2n0qplztMszym',
'db_name': 'liexin_credit_lx'
}
credit_local_server = {
'host': 'localhost',
'user': 'dashboard',
'password': 'ichunt5Dashboard@',
'db_name': 'dashboard'
}
credit_source_local_server = {
'host': 'localhost',
'user': 'dashboard',
'password': 'ichunt5Dashboard@',
'db_name': 'dashboard'
}
chain_server = {
'host': '172.18.137.21',
'user': 'ScsDbsy2x',
'password': 'xscSsy2@@!!@x%Yxm',
'db_name': 'liexin_sc'
}
rabbitMq_server = {
'host': '172.18.137.23',
# 'host': '192.168.1.237',
'user': 'huntadmin',
'password': 'jy2y2900'
}
local_redis = {
'host': '192.168.1.235',
'port': '6379',
'password': 'icDb29mLy2s'
}
write_redis = {
'host': '172.18.137.38',
'password': 'icDb29mLy2s',
'port': '6379'
}
write_redis_23 = {
'host': '172.18.137.21',
'password': 'icDb29mLy2s',
'port': '6379'
}
local_test_redis = {
'host': '192.168.1.235',
'password': 'icDb29mLy2s',
'port': '623379'
}
local_liexin = {
'host': '192.168.2.232',
'user': 'liexin',
'password': 'liexin#zsyM',
'db_name': 'liexin'
}
local_rank = {
'host': 'localhost',
'user': 'dashboard',
'password': 'ichunt5Dashboard@',
'db_name': 'dashboard'
}
lx_rank = {
'host': '172.18.137.37',
'user': 'Drakxs',
'password': 'sXtm23@!!@$2yoZ',
'db_name': 'data_rank'
}
supplier = {
'future': 1,
'rochester': 3,
'tme': 4,
'verical': 5,
'element14': 6,
'digikey': 7,
'chip1stop': 8,
'aipco': 9,
'arrow': 10,
'alliedelec': 12,
'avnet': 13,
'mouser': 14,
'peigenesis': 19,
'powell': 20,
'rs': 21,
'master': 1672,
'rutronik': 1673,
'corestaff': 1675,
'buerklin': 1676,
'microchip': 1677,
'heilind': 1678,
'ti': 1679,
'撮合': -1000,
'猎芯联营-技术采集': 17,
'猎芯联营-渠道开发': 17,
'猎芯自营': 10000,
}
# 猎芯联营采集
supplier_collect = ['L0000096', 'L0000004', 'L0000218']
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExActivity(Base):
"""
钱包活动
字段:
wpr_id: 钱包优惠规则自增id
rule_name: 规则名称
start_time: 活动开始时间(int)
end_time: 活动结束时间(int)
rule_name: 活动名称
mall_type: 适用范围:1-全站 2-自营 3-联营
receiver_type: 返现对象 1-仅下单人 2-仅邀请人 3-邀请人和被邀请人
inviter_cashback_scale: 邀请人返现比例
invitee_cashback_scale: 被邀请人返现比例
inviter_require_amount: 邀请人订单金额下限
invitee_require_amount: 被邀请人订单金额下限
inviter_max_preferential_amount: 邀请人最大返还金额
invitee_max_preferential_amount: 被邀请人最大返还金额
"""
def wallet_activity(self, condition):
col = ['wpr_id',
'rule_name',
'rule_name',
'mall_type',
'start_time',
'end_time',
'receiver_type',
'invitee_cashback_scale',
'inviter_cashback_scale',
'inviter_require_amount',
'invitee_require_amount',
'inviter_max_preferential_amount',
'invitee_max_preferential_amount']
col_str = super().col_to_str(col)
db = ConnList.Order()
wpr_id = condition['wpr_id']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_wallet_preferential_rule WHERE wpr_id = %d AND %s" % (col_str, wpr_id, con_str)
results = DBHandler.read(db=db, sql=sql)
return super().result_to_dict(col, results)
from utils.date_handler import DateHandler
import subprocess
class Base:
def __init__(self, name):
print('抽取数据:', name)
print('--------', name, '---------')
# WHERE条件组装
@staticmethod
......@@ -13,7 +15,7 @@ class Base:
con_str = ''
index = 0
if len(condition) > 0:
con_str += ' AND '
# con_str += ' AND '
for row in condition:
index += 1
if index < len(condition):
......@@ -53,4 +55,47 @@ class Base:
r_list.append(r_dict)
return r_list
# 结果转换列表
@staticmethod
def result_to_list(result):
r_l = []
for row in result:
r_l.append(row[0])
return r_l
# Value条件组装
@staticmethod
def value_to_str(col, result):
value = ''
index = 0
for c in col:
index += 1
if index < len(col):
value += '\'' + str(result[c]) + '\','
else:
value += '\'' + str(result[c]) + '\''
return value
# 删除Linux本地文件
@staticmethod
def rm_local_file(file_name):
cmd = "rm -f " + file_name
subprocess.getoutput(cmd)
# 读取本地文件
@staticmethod
def read_local_file(file_name):
result = []
fo = open(file_name)
for line in fo.readlines():
result.append(line)
return result
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExBehavior(Base):
"""
行为日志
"""
def behavior_log(self, condition):
# 筛选字段
col = ['ip', 'behavior', 'platform', 'create_time', 'user_id', 'adtag', 'param', 'scene', 'user_sign', 'ptag', 'behavior']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Behavior()
con_str = super().condition_to_str(condition['condition'])
start_time = condition['start_time']
end_time = condition['end_time']
sql = "SELECT %s \
FROM lie_behavior_log \
WHERE create_time BETWEEN %d AND %d AND %s" % (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExCrm(Base):
def crm_user(self, start_time, end_time):
# 订单数据库
db = ConnList.WrCrm()
sql = "SELECT outter_uid,user_id FROM lie_user WHERE create_time BETWEEN %d AND %d AND source IN (1,2) LIMIT 100000" % (start_time, end_time)
results = DBHandler.read(db=db, sql=sql)
return results
\ No newline at end of file
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExDashboard(Base):
def email_list(self, condition):
# 筛选字段
con_str = super().condition_to_str(condition['condition'])
# 订单数据库
db = ConnList.Dashboard()
sql = "SELECT email \
FROM \
lie_email_list \
WHERE %s" \
% con_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
print('final_result', len(final_result))
return final_result
"""
汇总订单数据
"""
def sum_order(self, condition):
# 筛选字段
col = ['sum(order_count)', 'sum(order_paid_count)']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s \
FROM lie_order_cal \
WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
抽取adtag
"""
def lie_adtag(self):
# 筛选字段
col = ['one_level_channel_en']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
sql = "SELECT %s FROM lie_adtag_config GROUP BY one_level_channel_en" % col_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取百度统计URL
"""
def baidu_page(self, condition):
# 筛选字段
col = ['url', 'pv_count']
col_str = super().col_to_str(col)
# 订单数据库
db = ConnList.Dashboard()
sql = "SELECT %s FROM lie_baidu_cal_visitpage WHERE cal_ts = \'%s\'" % (col_str, condition)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取ptag
"""
def lie_ptag(self):
# 筛选字段
col = ['ptag', 'regex', 'id']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
sql = "SELECT %s FROM lie_page" % col_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取活动数据
"""
def re_activity(self, condition):
# 筛选字段
col = ['id', 'sign', 'name', 'start_time', 'end_time', 'status']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Activity()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_activity WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取答题数据
"""
def answer(self, condition):
# 筛选字段
col = ['user_id', 'activity_id', 'score']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Order()
con_str = super().condition_to_str(condition['condition'])
start_time = condition['start_time']
end_time = condition['end_time']
sql = "SELECT %s FROM lie_question_user_score WHERE \
user_id not in (SELECT user_id FROM lie_user_main WHERE is_test = 1) AND \
create_time BETWEEN %d AND %d AND %s" % (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取自营料号数据
"""
def zy_goods(self, condition):
# 筛选字段
col = ['goods_id']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
start_time = condition['start_time']
sql = "SELECT %s FROM lie_sku_expose_goods WHERE cal_ts = %d" %\
(col_str, start_time)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
神策PV、UV
"""
def ex_shence_pu(self, condition):
# 筛选字段
col = ['pv', 'uv']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
start_time = condition['start_time']
sql = "SELECT %s FROM lie_shence_cal WHERE cal_ts = \'%s\'" % (col_str, start_time)
results = DBHandler.read(db, sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
SKU曝光总数
"""
def sku_sum_goods(self, condition):
col = ['goods_list']
# 连接数据库
db = ConnList.Dashboard()
start_time = condition['start_time']
sql = "SELECT sum(goods_list) FROM lie_sku_expose WHERE cal_ts = \'%s\'" % start_time
results = DBHandler.read(db, sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
平均周数据
"""
def avg_operate_data(self, condition):
col = ['avg(pv)', 'avg(uv)', 'avg(pv)/avg(uv)', 'sum(reg_user)', 'avg(active_user)']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Dashboard()
start_time = condition['start_time']
end_time = condition['end_time']
sql = "SELECT %s FROM lie_operate_daily WHERE cal_ts BETWEEN \'%s\' AND \'%s\'" % (col_str, start_time, end_time)
results = DBHandler.read(db, sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
from extract.ex_base import Base
from utils.db_handler import DBHandler
from config.conn_list import ConnList
from suds.client import Client
import hprose
import json
class ExERP(Base):
def __init__(self, date):
self.api_url = 'http://119.23.228.186:6868/ormrpc/services'
self.cmp_url = 'http://119.23.228.186:8081'
self.date = date
self.credit = ConnList.Credit()
self.credit_source = ConnList.CreditSource()
self.credit_local = ConnList.CreditLocal()
"""
获取ERP汇率
"""
def get_erp_exchange(self):
client = Client(self.api_url + '/WSIchuntjKFacade?wsdl')
params_dict = {"CURRENCY": "美元", "BIZDATE": self.date}
return client.service.getExchangeRate(json.dumps(params_dict))
"""
获取ERP元数据
"""
def get_erp_company(self, company):
try:
client = hprose.HttpClient(self.cmp_url)
params_dict = {"CUSTOMER": company}
res = client.getReceivableList(json.dumps(params_dict))
return json.loads(res)
except:
return {}
"""
获取DB ERP数据
"""
def get_erp_data(self, con_str, db):
col = ['id', 'order_amount', 'entru_no', 'business_time', 'erp_company_code', 'gross_profit', 'deadline_day', 'delay_day', 'return_amount', 'return_time', 'receive_time', 'tolerance']
col_str = super().col_to_str(col)
sql = "SELECT %s FROM lie_basic_detail %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
final_result = super().result_to_dict(col, results)
return final_result
"""
获取ERP 列表
"""
def get_erp_list(self, db):
sql = "SELECT erp_company_code,company_name FROM lie_com_credits WHERE erp_company_code != \'\' GROUP BY erp_company_code,company_name"
result = DBHandler.read(db=db, sql=sql)
rd = []
for row in result:
erp_company_code = row[0]
company_name = row[1]
rd.append({
'cmp_code': erp_company_code,
'cmp_name': company_name
})
return rd
"""
获取ERP编码
"""
def get_erp_code(self, db):
sql = "SELECT erp_company_code FROM lie_basic_detail WHERE erp_company_code != \'\' GROUP BY erp_company_code"
result = DBHandler.read(db=db, sql=sql)
rd = []
for row in result:
erp_company_code = row[0]
rd.append(erp_company_code)
return rd
"""
获取不同ERP月份
"""
def get_erp_diff_month(self, table, db):
sql = "SELECT month FROM %s GROUP BY month" % table
result = DBHandler.read(db=db, sql=sql)
rd = []
for row in result:
month = row[0]
rd.append(month)
return rd
"""
获取最近六个月的数据
"""
def get_six_data(self, months, table, db):
sql = "SELECT appoint_tolerance,erp_company_code,SUM(delay_avg_day * receive_count) / SUM(receive_count),SUM(period_user_level) / 6,SUM(receive_amount),SUM(appoint_tolerance) \
FROM %s \
WHERE MONTH IN %s \
GROUP BY erp_company_code \
ORDER BY MONTH DESC" % (table, months)
result = DBHandler.read(db=db, sql=sql)
rd = []
for row in result:
rd.append({
'tolerance': int(row[0]),
'cmp_code': row[1],
'delay_avg_day': float(row[2]) if row[2] is not None else 0,
'use_level': float(row[3]) if row[3] is not None else 0,
'receive_amount': float(row[4]),
'total_tolerance': int(row[5])
})
return rd
# def get_six_data_temp(self, months, table, db):
# sql = "SELECT appoint_tolerance,erp_company_code,delay_avg_day,receive_amount \
# FROM %s \
# WHERE MONTH IN %s" % (table, months)
# result = DBHandler.read(db=db, sql=sql)
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
import pymysql
import requests
class ExGoods(Base):
"""
自营SKU
"""
def ex_zy_goods(self, condition):
# 筛选字段
col = ['goods_name', 'ladder_price']
col_str = super().col_to_str(col)
rd = {}
# 连接数据库
db = ConnList.Zy()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s \
FROM lie_goods \
WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
for row in results:
rd[row[0]] = row[1]
return rd
"""
联营供应商列表
"""
def ex_ly_supplier(self, condition):
# 筛选字段
col = ['supplier_code', 'channel_uid', 'supplier_name']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.SupData()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_supplier_channel WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
return super().result_to_dict(col, results)
"""
获取猎芯联营下所有SKUID和价格
"""
def ex_ly_goods(self):
host = '172.18.137.29'
user = 'LxiCSpuR35'
password = 'Mysx3Tyzlo00oxlmllyR'
rd = {}
for i in range(0, 10):
db_name = 'liexin_sku_' + str(i)
print(db_name)
for j in range(0, 10):
table_name = 'lie_sku_' + str(j)
db = pymysql.connect(host, user, password, db_name, charset='utf8')
sql = "SELECT spu_id,ladder_price,canal FROM %s \
WHERE supplier_id = %d AND goods_status = 1 AND ladder_price != \'\'" % (table_name, 17)
result = DBHandler.read(db, sql)
for row in result:
rd[row[0]] = {'price': row[1], 'canal': row[2]}
print(len(rd))
return rd
"""
获取ERP联营采购数据
"""
def ex_erp_goods(self, condition):
# 筛选字段
col = ['supplier_name', 'sale_man', 'sale_order_no', 'pur_order_no',
'sale_man', 'pur_order_bizdate']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Order()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_erp_purchase \
WHERE pur_order_bizdate BETWEEN %d AND %d AND %s" % \
(col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
from extract.ex_base import Base
from config.conn_list import ConnList
from config.db import rabbitMq_server
from utils.rabbitMq_handler import Customer
from utils.msg_handler import MsgHandler
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from utils.log_handler import LogHandler
import traceback
import pymysql
import time
import json
class ExIckey(Base):
"""
处理ICKEY自营数据
"""
def ex_ickey_goods(self):
mq_db = ConnList.Dashboard()
def callback(ch, method, properties, body):
try:
self.storage(body, mq_db)
time.sleep(0.1)
except pymysql.err.OperationalError:
MsgHandler.send_dd_msg("【ICKEY数据库出错】\n" + "【出错数据】" + str(body) + "\n" + traceback.format_exc())
customer.send_task(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
MsgHandler.send_dd_msg("【ICKEY采集出错】\n" + "【出错数据】" + str(body) + "\n" + traceback.format_exc())
else:
ch.basic_ack(delivery_tag=method.delivery_tag) # tell mq the task has done
# 判断队列数据数
msg_count = ch.queue_declare(queue='ickey_goods_store', durable=True).method.message_count
# 任务结束
if msg_count <= 0:
# 停止消费
ch.stop_consuming()
customer = Customer(rabbitMq_server['user'], rabbitMq_server['password'],
rabbitMq_server['host'], 'ickey_goods_store', '')
customer.server_forever(callback)
"""
读取云汉指数并写入文件
"""
def ex_ickey_index(self):
# 发送任务开始消息
# MsgHandler.send_dd_msg("【云汉指数数据采集】 启动")
start = DateHandler.now_datetime()
# 文件写入
file_name = r'/data3/hdfs_data/ickey_index/' + 'ickey_' + DateHandler.now_date(days=0, d_type=2) + '.txt'
fo = open(file_name, "w")
def callback(ch, method, properties, body):
try:
# 判断是否有数据
if ch.queue_declare(queue='ickey_index_data', durable=True).method.message_count > 0:
# 数据解析与写入
body_json = json.loads(str(body, encoding="utf-8"))
time.sleep(0.001)
result = "%s|%s|%s|%s|%s|%s|%s" % (body_json['goods_name'],
body_json['searchTrendAnalysis'],
body_json['searchAreaTrendAnalysis'],
body_json['industryType'],
body_json['search_num'],
body_json['trade_num'],
body_json['buyer_num'])
fo.write(result + "\n")
else:
ch.stop_consuming()
except Exception:
MsgHandler.send_dd_msg("【云汉指数采集出错】\n" + "【出错数据】" + str(body) + "\n" + traceback.format_exc())
else:
ch.basic_ack(delivery_tag=method.delivery_tag) # tell mq the task has done
# 判断队列数据数
msg_count = ch.queue_declare(queue='ickey_index_data', durable=True).method.message_count
# 刷新内存
if msg_count % 10000 == 0:
fo.flush()
# 任务结束
if msg_count <= 0:
# 发送任务结束消息
end = DateHandler.now_datetime()
dur = DateHandler.cal_duration(start, end, t_type=2)
MsgHandler.send_dd_msg("【云汉指数采集任务】 结束\n" +
"任务耗时:" + str(dur) + "分钟\n")
# 关闭文件流
fo.close()
# 停止消费
ch.stop_consuming()
customer = Customer(rabbitMq_server['user'], rabbitMq_server['password'],
rabbitMq_server['host'], 'ickey_index_data', '')
customer.server_forever(callback)
"""
ickey自营数据存储
"""
def storage(self, body, db):
result = json.loads(body)
if 'goods_sn' in result:
sql_exist = "SELECT id FROM lie_ickey_goods_stock WHERE goods_sn = %d" % result['goods_sn']
exist = DBHandler.read(db, sql_exist)
# 不存在则插入,存在则更新
# 插入
if len(exist) == 0:
sql = "INSERT INTO lie_ickey_goods_stock(goods_sn, pn, goods_name, brand_name, stock, moq, increment, \
url, ts, prices) VALUES ('%d', '%s', '%s', '%s', '%d', '%d', '%d', '%s', '%d', '%s')" % \
(result['goods_sn'], result['pn'], result['goods_name'], result['brand_name'], int(result['stock'][1]),
int(result['stock'][0]), result['increment'], result['url'], result['time'], result['prices'])
DBHandler.insert(db, sql)
# 更新
else:
sql = "UPDATE lie_ickey_goods_stock SET stock='%d', moq='%d', increment='%d', prices='%s', ts = '%d' \
WHERE goods_sn = '%d'" % \
(int(result['stock'][1]), int(result['stock'][0]), result['increment'],
result['prices'], DateHandler.now_datetime(), result['goods_sn'])
cursor = db.cursor()
try:
cursor.execute(sql)
except:
db.rollback()
# LogHandler.elk_log(str(traceback.format_exc()), 500, "/data3/dataLog/ex_ickey.log", "EX_ICKEY")
"""
获取所有ickey指数trade_num >= 10的数据
"""
def ex_ickey_index_goods(self):
index = 0
sum = 0
rd = {}
hdfs_file = r'/ickey_index/' + 'ickey_' + \
DateHandler.now_date(days=0, d_type=2) + '.txt'
hdfs_data = DBHandler.hdfs_read(hdfs_file)
print(hdfs_file)
for row in hdfs_data:
row = str(row, encoding="utf-8")
try:
goods_name = str(row).split('|')[0]
search_num = int(str(row).split('|')[4])
trade_num = int(str(row).split('|')[5])
buyer_num = int(str(row).split('|')[6])
sum += 1
if trade_num >= 10 and goods_name not in rd:
rd[goods_name] = {'ickey_search_count': search_num, 'ickey_trade_count': trade_num,
'ickey_buyer_count': buyer_num, 'create_time': DateHandler.now_datetime(),
'update_time': DateHandler.now_datetime(), 'goods_name': goods_name}
index += 1
except:
pass
print(index, sum)
return rd
......@@ -2,6 +2,8 @@ from extract.ex_base import Base
from config.conn_list import ConnList
from utils.date_handler import DateHandler
from utils.msg_handler import MsgHandler
from utils.log_handler import LogHandler
import subprocess
class ExLySku(Base):
......@@ -79,3 +81,65 @@ class ExLySku(Base):
MsgHandler.send_dd_msg("【Mongo数据删除任务】 结束\n" +
"任务耗时:" + str(dur) + "分钟\n" +
"删除数量:" + str(del_doc.deleted_count))
@staticmethod
def put_hdfs():
# 发送任务开始消息
MsgHandler.send_dd_msg("【数据上传HDFS任务】 启动")
start = DateHandler.now_datetime()
# 执行CMD命令
# 上传hdfs文件
cmd_1 = "hdfs dfs -put /data3/hdfs_data/ly_sku_price/sku_" + \
DateHandler.now_date(days=0, d_type=2) + \
".txt " + \
"/ly_sku_price/"
# 删除本地文件
cmd_2 = "rm -f " + "/data3/hdfs_data/ly_sku_price/sku_" + \
DateHandler.now_date(days=0, d_type=2) + \
".txt"
subprocess.getoutput(cmd_1)
subprocess.getoutput(cmd_2)
# 发送任务结束消息
end = DateHandler.now_datetime()
dur = DateHandler.cal_duration(start, end, t_type=2)
MsgHandler.send_dd_msg("【数据上传HDFS任务】 结束\n" +
"任务耗时:" + str(dur) + "分钟\n")
@staticmethod
def load_hbase():
# 发送任务开始消息
MsgHandler.send_dd_msg("【HDFS数据写入HBASE任务】 启动")
start = DateHandler.now_datetime()
msg = ""
# 执行CMD命令
cmd = "/data2/hbase/hbase-2.0.1/bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv " \
"-Dimporttsv.columns=HBASE_ROW_KEY,cf1:sku_id,cf1:tiered,cf1:time " \
"'-Dimporttsv.separator=|' sku:stockgoods /ly_sku_price/sku_" + \
DateHandler.now_date(days=0, d_type=2) + ".txt"
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in p.communicate():
msg += str(info) + "\n"
# 日志写道ELK
LogHandler.elk_log(msg, 101, '/data3/dataLog/ly_price_collect_' + DateHandler.now_date(0, 2) + '.log',
'sku_collect')
# 发送任务结束消息
end = DateHandler.now_datetime()
dur = DateHandler.cal_duration(start, end, t_type=2)
MsgHandler.send_dd_msg("【HDFS数据写入HBASE任务】 结束\n" +
"任务耗时:" + str(dur) + "分钟\n")
@staticmethod
def del_hdfs():
hdfs_file = '/ly_sku_price/sku_' + \
DateHandler.now_date(days=0, d_type=2) + '.txt'
cmd = 'hadoop dfs -rm -r ' + hdfs_file
subprocess.getoutput(cmd)
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
class ExMarket(Base):
def ex_prize_log(self, condition):
# 筛选字段
col = ['user_id']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Order()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_prize_winner WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
优惠券订单信息
"""
def ex_user_coupon(self, condition):
# 筛选字段
col = ['c.user_id', 'o.order_amount', 'o.status', 'o.currency']
col_str = super().col_to_str(col)
# 连接数据库
db = ConnList.Order()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s \
FROM lie_user_coupon c \
LEFT JOIN lie_order o \
ON c.order_id = o.order_id \
WHERE %s" % (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExOthers(Base):
def email_list(self, condition):
# 筛选字段
con_str = super().condition_to_str(condition['condition'])
# 订单数据库
db = ConnList.Dashboard()
sql = "SELECT email \
FROM \
lie_email_list \
WHERE id > 0 %s" \
% con_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
print('final_result', len(final_result))
return final_result
from extract.ex_base import Base
from config.conn_list import ConnList
from utils.db_handler import DBHandler
class ExPurchase(Base):
"""
库存预警
"""
def safe_stock(self, condition):
# 筛选字段
col = ['sku_id', 'b.packing_name']
col_str = super().col_to_str(col)
# 订单数据库
db = ConnList.Wms()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM \
lie_safe_stock a \
LEFT JOIN lie_packing b \
ON a.packing_id = b.packing_id \
WHERE %s" \
% (col_str, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
采购单明细
i.status 1:正常 -1:已删除
p.status -10:作废 -1:草稿 1:待审核 4:待发货 6:部分发货 10:完全发货 11:强制完成
"""
def purchase_count(self, condition):
# 筛选字段
col = ['count(*) as count', 'sum(picking_number * picking_price) amount']
col_str = super().col_to_str(col)
db = ConnList.Wms()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM \
lie_purchase_items i \
LEFT JOIN lie_purchase p ON i.picking_id = p.picking_id \
WHERE p.audit_time BETWEEN %d AND %d AND %s" % (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
rd = []
for row in results:
rd.append({'count': row[0], 'amount': float(row[1]) if row[1] is not None else 0})
return rd
"""
采购单明细
"""
def purchase_items(self, condition):
# 筛选字段
col = ['brand_name', '(picking_number * picking_price) amount']
col_str = super().col_to_str(col)
db = ConnList.Wms()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM \
lie_purchase_items i \
LEFT JOIN lie_purchase p ON i.picking_id = p.picking_id \
WHERE p.audit_time BETWEEN %d AND %d AND %s" % (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
rd = []
for row in results:
rd.append({'brand': row[0], 'amount': float(row[1]) if row[1] is not None else 0})
return rd
"""
待入库数量
"""
def in_stock(self, condition):
db = ConnList.lxData()
start_time = condition['start_time']
end_time = condition['end_time']
sql = "SELECT count(*) FROM lie_examine WHERE status = 1 AND add_time \
BETWEEN %d AND %d" % (start_time, end_time)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
return final_result
"""
总待入库数量
"""
def wait_stock(self):
db = ConnList.lxData()
sql = "SELECT count(*) FROM lie_goods WHERE status = 0"
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
return final_result
"""
总待审核数量
"""
def wait_examine(self):
# 订单数据库
db = ConnList.lxData()
sql = "SELECT count(*) FROM lie_examine WHERE status = 0"
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
return final_result
......@@ -11,7 +11,7 @@ class ExUser(Base):
def reg_user(self, condition):
# 筛选字段
col = ['user_id', 'create_time', 'mobile', 'email']
col = ['user_id', 'create_time', 'mobile', 'email', 'reg_remark', 'create_device']
col_str = super().col_to_str(col)
# 用户数据库
......@@ -26,8 +26,217 @@ class ExUser(Base):
WHERE \
create_time BETWEEN %d AND %d \
AND is_test = 0 \
AND is_type = 0 %s" \
AND is_type = 0 \
AND create_device != 20 %s" \
% (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
获取登录用户
"""
def login_user(self, condition):
# 筛选字段
col = ['user_id', 'login_remark', 'platform']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s \
FROM \
lie_user_login_log \
WHERE \
last_login_time BETWEEN %d AND %d \
AND user_id NOT IN (SELECT user_id FROM lie_user_main WHERE is_test = 1) %s" \
% (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
用户优惠券
"""
def user_coupon(self, condition):
# 筛选字段
col = ['source']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s FROM lie_user_coupon \
WHERE create_time BETWEEN %d AND %d %s" \
% (col_str, start_time, end_time, con_str)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
"""
复购人员
"""
def re_pur_user(self, condition):
# 筛选字段
col = ['o.user_id']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
count = condition['count']
sql = "SELECT %s \
FROM \
lie_order o \
LEFT JOIN lie_user_main u \
ON o.user_id = u.user_id \
WHERE \
o.create_time <= %d \
AND o.STATUS > 2 \
AND o.order_type = 1 \
AND o.is_type = 0 \
AND u.is_test = 0 \
AND u.create_device != 20 \
GROUP BY o.user_id \
HAVING COUNT( o.user_id ) >= %d" \
% (col_str, start_time, count)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
return final_result
"""
复购人员账号
"""
def re_pur_account(self, condition):
# 筛选字段
col = ['u.email', 'u.mobile', 'u.create_time']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
count = condition['count']
sql = "SELECT %s \
FROM \
lie_order o \
LEFT JOIN lie_user_main u \
ON o.user_id = u.user_id \
WHERE \
o.create_time <= %d \
AND o.STATUS > 2 \
AND o.order_type = 1 \
AND o.is_type = 0 \
AND u.is_test = 0 \
AND u.create_device != 20 \
GROUP BY o.user_id \
HAVING COUNT( o.user_id ) >= %d" \
% (col_str, start_time, count)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(results)
return final_result
"""
历史未注册用户
"""
def his_no_buy(self, condition):
#
col = ['user_id']
col_str = super().col_to_str(col)
sql = "SELECT %s \
FROM lie_user_main \
WHERE is_test = 0 AND is_type = 0" % col_str
"""
首购用户
"""
def first_buy_user(self, condition):
# 筛选字段
col = ['o.user_id', 'o.order_amount', 'o.status', 'o.currency', 'o.pay_time', 'o.sale_id',
'o.create_time', 'i.tax_title', 'i.nike_name', 'u.mobile', 'u.email']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
end_time = condition['end_time']
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT %s \
FROM lie_order o\
LEFT JOIN lie_order_invoice i \
ON o.order_id = i.order_id \
LEFT JOIN lie_user_main u \
ON o.user_id = u.user_id \
WHERE o.create_time BETWEEN %d AND %d \
AND o.order_type = 1 \
AND o.is_type = 0 \
AND u.is_test != 1 \
AND %s \
AND o.user_id NOT IN \
(SELECT user_id FROM lie_order WHERE is_type = 0 AND order_type = 1 AND create_time <= %d GROUP BY user_id)" \
% (col_str, start_time, end_time, con_str, start_time)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
def new_user_order(self, condition):
col = ['o.user_id', 'u.mobile', 'u.email']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
start_time = condition['start_time']
end_time = condition['end_time']
sql = "SELECT %s FROM lie_order o \
LEFT JOIN lie_user_main u \
ON o.user_id = u.user_id \
WHERE o.user_id IN \
( \
SELECT user_id FROM lie_user_main WHERE user_id NOT IN \
( \
SELECT user_id FROM lie_order WHERE is_type = 0 AND order_type = 1 \
AND create_time <= %s \
) \
AND is_test = 0 \
) AND o.create_time \
BETWEEN %s AND %s \
AND o.order_type = 1 AND o.is_type = 0 GROUP BY o.user_id" % (col_str, start_time, start_time, end_time)
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
......@@ -35,3 +244,60 @@ class ExUser(Base):
print('final_result', len(final_result))
return final_result
"""
根据文件获取用户id
"""
def file_user(self):
rd = []
db = ConnList.Order()
data = self.read_local_file('name.txt')
for d in data:
mobile = d.replace('\n', '')
sql = "SELECT user_id,email FROM lie_user_main WHERE mobile = %d" % int(mobile)
print(sql)
results = DBHandler.read(db=db, sql=sql)
if len(results) > 0:
rd.append({'mobile': mobile, 'email': results[0][1], 'user_id': results[0][0]})
return rd
"""
获取报价系统user_id
"""
def lx_offer(self, condition):
# 用户数据库
db = ConnList.Dashboard()
con_str = super().condition_to_str(condition['condition'])
sql = "SELECT user_id FROM lie_offer WHERE %s " % con_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_list(results)
return final_result
"""
测试IP
"""
def test_ip(self):
# 筛选字段
col = ['ip']
col_str = super().col_to_str(col)
# 用户数据库
db = ConnList.Order()
sql = "SELECT %s \
FROM lie_test_ip"\
% col_str
results = DBHandler.read(db=db, sql=sql)
# 结果格式转换为字典
final_result = super().result_to_dict(col, results)
return final_result
No preview for this file type
This diff could not be displayed because it is too large.
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
HADOOP_HOME=/data2/hadoop/hadoop-2.7.6
PATH=$PATH:$HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH HADOOP_HOME
!connect jdbc:hive2://localhost:10000
hive
{"/usr/python":{"last_check":"2019-09-24T03:07:38Z","pypi_version":"19.2.3"}}
\ No newline at end of file
This diff could not be displayed because it is too large.
/*
Navicat Premium Data Transfer
Source Server : liexin_credit
Source Server Type : MySQL
Source Server Version : 50548
Source Host : 192.168.2.232:3306
Source Schema : liexin_credit
Target Server Type : MySQL
Target Server Version : 50548
File Encoding : 65001
Date: 13/12/2019 14:18:06
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for lie_com_credits
-- ----------------------------
DROP TABLE IF EXISTS `lie_com_credits`;
CREATE TABLE `lie_com_credits` (
`id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键id',
`apply_account` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '申请账号 手机号 法人电话',
`erp_company_code` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'erp公司编码',
`company_name` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '公司名字',
`static_credit_score` decimal(6, 2) UNSIGNED NOT NULL DEFAULT 0.00 COMMENT '静态信用分',
`dynamic_credit_score` decimal(6, 2) UNSIGNED NOT NULL DEFAULT 0.00 COMMENT '动态信用分',
`total_credit_score` decimal(6, 2) NOT NULL DEFAULT 0.00 COMMENT '总信用分',
`status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '1已申请 5待评分 10已评分',
`blacklist_hit` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '黑名单命中',
`credit_invest_result` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '征信达标结果',
`ichunt_hunting_core` tinyint(1) NOT NULL COMMENT '猎芯网评分分级',
`recommended_amount` decimal(12, 2) NOT NULL DEFAULT 0.00 COMMENT '建议额度',
`recom_time_limit` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '建议区间',
`admin_id` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '评分人ID',
`admin_name` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '评分人',
`score_time` int(11) UNSIGNED NOT NULL DEFAULT 0 COMMENT '评分时间',
`create_time` int(11) UNSIGNED NOT NULL DEFAULT 0 COMMENT '创建时间',
`update_time` int(11) UNSIGNED NOT NULL DEFAULT 0 COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `company_name`(`company_name`) USING BTREE,
INDEX `company_code_2`(`erp_company_code`, `status`, `create_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 48 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '公司信用分管理' ROW_FORMAT = Compact;
-- ----------------------------
-- Records of lie_com_credits
-- ----------------------------
INSERT INTO `lie_com_credits` VALUES (2, '13277999723', 'LX001', '猎芯科技', 23.50, 17.00, 40.50, 10, '', '', 4, 50000.00, '5天', 1354, '朱国军', 1575453290, 1574675417, 1575453290);
INSERT INTO `lie_com_credits` VALUES (5, '5695', 'LX002', '小明科技', 16.50, 13.00, 29.50, 10, '', '', 5, 0.00, '0', 1357, '朱继来', 1576047410, 1574695555, 1576047410);
INSERT INTO `lie_com_credits` VALUES (9, '17600091664', 'LX003', '猎芯科技1', 24.25, 0.00, 24.25, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1575531942, 1575355839, 1575531942);
INSERT INTO `lie_com_credits` VALUES (15, '15365025118', 'LX004', '211221', 15.50, 28.00, 43.50, 10, '', '', 4, 50000.00, '5天', 0, '', 0, 1575448470, 0);
INSERT INTO `lie_com_credits` VALUES (16, '18589050841', 'LX005', '18589050841', 16.00, 16.00, 32.00, 10, '', '', 5, 0.00, '0', 0, '', 0, 1575449102, 0);
INSERT INTO `lie_com_credits` VALUES (17, '15989570000', 'LX006', '深圳市猎芯科技有限公司', 10.00, 0.00, 10.00, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1575969536, 1575669102, 1575969536);
INSERT INTO `lie_com_credits` VALUES (18, '15989571111', 'LX007', '深圳卓越飞讯电子有限公司', 10.50, 0.00, 10.50, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1576045211, 1575779102, 1576045211);
INSERT INTO `lie_com_credits` VALUES (19, '15989572222', 'LX008', '北京天涯泰盟科技股份有限公司', 10.00, 0.00, 10.00, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1576045976, 1575889102, 1576045976);
INSERT INTO `lie_com_credits` VALUES (20, '13397978887', 'LX008', '深圳市同创芯科技有限公司', 10.00, 0.00, 10.00, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1575968590, 1575966673, 1575968590);
INSERT INTO `lie_com_credits` VALUES (21, '13397978829', 'LX007', '深圳市极限网络科技有限公司', 10.50, 0.00, 10.50, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1575969906, 1575969833, 1575969906);
INSERT INTO `lie_com_credits` VALUES (22, '13640960251', 'LX007', '深圳市品慧电子有限公司', 10.00, 0.00, 10.00, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1576035832, 1575970573, 1576035832);
INSERT INTO `lie_com_credits` VALUES (23, '13388880000', 'LX007', '深圳市百姓通商网络科技有限公司', 9.50, 0.00, 9.50, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1575970936, 1575970821, 1575970936);
INSERT INTO `lie_com_credits` VALUES (24, '16600001111', 'LX007', '深圳市锦懋微电子有限公司', 10.50, 0.00, 10.50, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1576048947, 1576048830, 1576048947);
INSERT INTO `lie_com_credits` VALUES (25, '13602602902', '', '深圳一二三科技有限公司', 10.00, 0.00, 10.00, 10, '', '', 1, 300.00, '1天', 1357, '朱继来', 1576118530, 1576050156, 1576118530);
INSERT INTO `lie_com_credits` VALUES (26, '13012340000', '', '创业集团', 9.50, 0.00, 9.50, 10, '', '', 1, 300.00, '1天', 0, '', 0, 1576053465, 0);
INSERT INTO `lie_com_credits` VALUES (27, '13012340001', '', '深圳明主科技有限公司1', 10.00, 0.00, 1.00, 10, '', '', 1, 300.00, '1天', 1354, '朱国军', 1576058659, 1576054647, 1576058659);
INSERT INTO `lie_com_credits` VALUES (28, '13012340003', '', '自由公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 1357, '朱继来', 1576057792, 1576056986, 1576057792);
INSERT INTO `lie_com_credits` VALUES (29, '13012340000', 'LX008', '完美世界游戏有限责任公司', 11.00, 0.00, 11.00, 10, '', '', 5, 0.00, '0', 1354, '朱国军', 1576144150, 1576118006, 1576144150);
INSERT INTO `lie_com_credits` VALUES (30, '13602602902', '', '扫毒2A', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 1515, '孙龙', 1576134327, 1576120755, 1576134327);
INSERT INTO `lie_com_credits` VALUES (31, '13602602902', '', 'company', 6.00, 0.00, 6.00, 1, '', '', 1, 300.00, '1天', 0, '', 0, 1576131241, 1576131241);
INSERT INTO `lie_com_credits` VALUES (32, '17600091664', '', '中印云端(深圳)科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134467, 1576134467);
INSERT INTO `lie_com_credits` VALUES (33, '17600091664', '', '深圳卓越飞讯科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134486, 1576134486);
INSERT INTO `lie_com_credits` VALUES (34, '17600091664', '', '深圳市展创电子有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134503, 1576134503);
INSERT INTO `lie_com_credits` VALUES (35, '17600091664', '', '深圳市义熙科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134511, 1576134511);
INSERT INTO `lie_com_credits` VALUES (36, '17600091664', '', '深圳市耀亮科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134518, 1576134518);
INSERT INTO `lie_com_credits` VALUES (37, '17600091664', '', '深圳市尚格实业有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134534, 1576134534);
INSERT INTO `lie_com_credits` VALUES (38, '17600091664', '', '深圳市晶尚景电子科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134542, 1576134542);
INSERT INTO `lie_com_credits` VALUES (39, '17600091664', '', '深圳市锦锐科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134548, 1576134548);
INSERT INTO `lie_com_credits` VALUES (40, '17600091664', '', '深圳市弘安盛电子有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134562, 1576134562);
INSERT INTO `lie_com_credits` VALUES (41, '17600091664', '', '深圳市和世达电子科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134568, 1576134568);
INSERT INTO `lie_com_credits` VALUES (42, '17600091664', '', '深圳市禾田普达科技有限公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576134574, 1576134574);
INSERT INTO `lie_com_credits` VALUES (43, '17600091664', '', '上海麦霖电子技术有限公司', 0.00, 0.00, 0.00, 5, '', '', 0, 0.00, '', 1515, '孙龙', 1576217604, 1576134581, 1576217604);
INSERT INTO `lie_com_credits` VALUES (44, '17600091664', '', '成都鸿悦科技有限公司', 0.00, 0.00, 0.00, 5, '', '', 0, 0.00, '', 1515, '孙龙', 1576216747, 1576134587, 1576216747);
INSERT INTO `lie_com_credits` VALUES (45, '17600091664', 'GGN0001477', '北京大唐高鸿数据网络技术有限公司', 10.00, 0.00, 10.00, 10, '', '', 5, 0.00, '0', 1515, '孙龙', 1576199464, 1576134600, 1576199464);
INSERT INTO `lie_com_credits` VALUES (46, '18912340000', '', '小红公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576205207, 1576205207);
INSERT INTO `lie_com_credits` VALUES (47, '13602602902', '', '小学公司', 0.00, 0.00, 0.00, 1, '', '', 0, 0.00, '', 0, '', 0, 1576206833, 1576206833);
SET FOREIGN_KEY_CHECKS = 1;
No preview for this file type
No preview for this file type
from utils.date_handler import DateHandler
from utils.msg_handler import MsgHandler
from utils.log_handler import LogHandler
import subprocess
class LoadHbase:
"""
sep: 分隔符
"""
@staticmethod
def cmd_load(row, table, hdfs_file, message, sep):
# 执行CMD命令
msg = ""
cmd = "/data2/hbase/hbase-2.0.1/bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.columns=HBASE_ROW_KEY,%s '-Dimporttsv.separator=%s' %s %s" % (row, sep, table, hdfs_file)
# cmd = "/data2/hbase/hbase-2.0.1/bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv " \
# "-Dimporttsv.columns=HBASE_ROW_KEY," + row + \
# " '-Dimporttsv.separator=|' " + table + " " + hdfs_file
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in p.communicate():
msg += str(info) + "\n"
# 日志写道ELK
LogHandler.elk_log(msg, 101, '/data3/dataLog/ickey_index' + DateHandler.now_date(0, 2) + '.log',
'ickey_index')
# 发送信息
MsgHandler.send_dd_msg(message)
from extract.ex_base import Base
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
class LoadMysql:
"""
col:字段
table:表
data:数据
db:数据库
"""
@staticmethod
def sample_load(col, table, data, db, cal_time=True):
col_to_str = Base.col_to_str(col)
for d in data:
if cal_time:
data[d]['insert_time'] = DateHandler.now_datetime()
data[d]['cal_ts'] = DateHandler.now_date(1, 1)
value_to_str = Base.value_to_str(col, data[d])
sql = "INSERT INTO %s (%s) VALUES (%s)" % (table, col_to_str, value_to_str)
DBHandler.insert(db, sql)
db.close()
"""
col:字段
table:表
data:数据
db:数据库
"""
@staticmethod
def simple_dict_load(col, table, data, db, cal_time=True):
col_to_str = Base.col_to_str(col)
for d in data:
if cal_time:
d['cal_ts'] = DateHandler.date_time(1)
value_to_str = Base.value_to_str(col, d)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (table, col_to_str, value_to_str)
DBHandler.insert(db, sql)
db.close()
@staticmethod
def keyword_load(col, table, value, db):
col_to_str = Base.col_to_str(col)
value_to_str = Base.col_to_str(value)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (table, col_to_str, value_to_str)
DBHandler.insert(db, sql)
db.close()
from utils.excel_handler import ExcelHandler
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from config.conn_list import ConnList
import datetime
data = ExcelHandler.read_to_excel('待分配客户-重新分配.xls', 'Sheet1', 1)
data.pop(0)
db = ConnList.Order()
ic_db = ConnList.IcData()
ds_db = ConnList.WrCrm()
now_ts = DateHandler.now_datetime()
com_types = {'默认': 0, '代工厂': 1, '终端': 2, '代理商': 3, '贸易商': 4, '其他': 5}
cus_types = {'默认': 0, '老板': 1, '采购': 2, '工程师': 3, '学生': 4, '其他': 5}
print(len(data))
for row in data:
email = str(row[0]).strip()
mobile = str(row[1]).strip()
company = str(row[2]).strip()
sale_man = str(row[3]).strip()
if mobile != '':
sql = "SELECT user_id FROM lie_user_main WHERE mobile = \'%s\'" % mobile
else:
sql = "SELECT user_id FROM lie_user_main WHERE email = \'%s\'" % email
user = DBHandler.read(db, sql)
if len(user) > 0:
outter_uid = user[0][0]
sql = "select userId from user_info where name = \'%s\'" % sale_man
sale = DBHandler.read(db=ic_db, sql=sql)
if len(sale) > 0:
sale_id = sale[0][0]
sql = "SELECT user_id FROM lie_user WHERE outter_uid = %d AND source IN (1,2)" % outter_uid
is_exist = DBHandler.read(ds_db, sql)
if len(is_exist) > 0:
user_id = is_exist[0][0]
sql = "UPDATE lie_salesman SET user_id = %d,sale_id = %d WHERE user_id = %d" % (user_id, sale_id, outter_uid)
DBHandler.update(ds_db, sql)
# print(len(data))
from config.conn_list import ConnList
from utils.db_handler import DBHandler
db = ConnList.Order()
r = ConnList.WriteRedis23()
sql = "SELECT i.brand_id, COUNT(i.brand_id) FROM lie_order_items i \
LEFT JOIN lie_order o ON i.order_id = o.order_id \
WHERE i.goods_type = 2 AND i.status = 1 AND i.brand_id != 0 \
GROUP BY i.brand_id ORDER BY COUNT(i.brand_id) DESC"
result = DBHandler.read(db, sql)
print(len(result))
for row in result:
print(row)
brand_id = row[0]
brand_count = row[1]
# r.hset("Top_Seller_Brand", brand_id, brand_count)
from config.conn_list import ConnList
from utils.db_handler import DBHandler
chain_db = ConnList.Chain()
credit_db = ConnList.Credit()
sql = "SELECT id,company_name FROM lie_com_credits WHERE erp_company_code = \'\'"
result = DBHandler.read(credit_db, sql)
for row in result:
com_id = row[0]
company_name = row[1]
sql = "SELECT customer_code \
FROM lie_company c \
LEFT JOIN lie_customer u ON c.company_id = u.company_id\
WHERE company_full_name = \'%s\' LIMIT 1" % company_name
exist = DBHandler.read(chain_db, sql)
if len(exist) > 0:
company_code = exist[0][0]
if company_code is not None:
sql = "UPDATE lie_com_credits SET erp_company_code = \'%s\' WHERE id = %d" % (company_code, com_id)
DBHandler.update(credit_db, sql)
\ No newline at end of file
from pipeline.pi_behavior import PiBehavior
from pipeline.pi_tag import PiTag
from pipeline.pi_goods import PiGoods
from pipeline.pi_page import PiPage
from pipeline.pi_activity import PiActivity
from pipeline.pi_daily import PiDaily
from pipeline.pi_weekly import PiWeekly
from pipeline.pi_lx_log import PiLxLog
from pipeline.pi_lx_log_url import PiLxLogUrl
from pipeline.pi_rank import PiRank
from utils.date_handler import DateHandler
import sys
if __name__ == '__main__':
if len(sys.argv) > 1:
if sys.argv[1] == 'sum_behavior': #
PiBehavior.pipeline_sum_behavior()
if sys.argv[1] == 'zy_adtag': # 自营adtag
PiTag().pipeline_zy_adtag()
if sys.argv[1] == 'all_adtag': #
PiTag().pipeline_all_adtag()
if sys.argv[1] == 'erp_order': # 联营线下订单
PiGoods.pipeline_special_canal()
if sys.argv[1] == 'page_trans': # 页面转换
PiPage().pipeline_pageTag()
if sys.argv[1] == 'ac_cal': # 活动计算
PiActivity.pipeline_daily_activity()
if sys.argv[1] == 'share_friend': # 邀请好友
PiActivity.pipeline_wallet_activity()
if sys.argv[1] == 'week_hot_goods': # 每周热卖
PiWeekly.pipeline_week_hot_goods()
if sys.argv[1] == 'week_classify_hot_goods': # 每周热卖一二级物料
PiWeekly.pipeline_week_classify_hot_goods()
if sys.argv[1] == 'lx_brand': # 每周品牌
PiWeekly.pipeline_lx_brand()
if sys.argv[1] == 'lx_order': # 每周订单
PiWeekly.pipeline_lx_order()
if sys.argv[1] == 'crm_user': # 更新crm用户信息
PiDaily.pipeline_crm_user()
if sys.argv[1] == 'lx_log': # 日志解析
pi_lx_log = PiLxLog()
pi_lx_log.pipeline_lx_log()
if sys.argv[1] == 'lx_log_url':
pi_lx_log_url = PiLxLogUrl()
pi_lx_log_url.pipeline_lx_log_url()
if sys.argv[1] == 'ly_log':
pi_ly_log = PiLxLog()
pi_ly_log.pipeline_ly_log()
if sys.argv[1] == 'lx_rank1':
start_time = DateHandler.date_time(1)
end_time = DateHandler.date_time(0)
PiRank().rank_one(start_time, end_time)
if sys.argv[1] == 'lx_rank3':
PiRank().rank_three()
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
from utils.excel_handler import ExcelHandler
from translate.ts_behavior import TsBehavior
from extract.ex_user import ExUser
from extract.ex_behavior import ExBehavior
from extract.ex_dashboard import ExDashboard
from extract.ex_order import ExOrder
from utils.date_handler import DateHandler
from load.load_mysql import LoadMysql
from config.conn_list import ConnList
from utils.db_handler import DBHandler
import time
class PiBehavior:
@staticmethod
def pipeline_user_behavior():
where = {'start_time': 1543593600, 'end_time': 1546272000, 'condition': []}
ex_user = ExUser('新用户购买')
# 首购用户
# data = ex_user.new_user_order(where)
# 新注册用户
data = ex_user.reg_user(where)
# 文件用户
# data = ex_user.file_user()
ts_be = TsBehavior(name='行为转化', data=data)
ts_be.trans_user_behavior(where)
@staticmethod
def pipeline_sum_behavior():
# 获取测试IP
test_ip = []
data = ExUser('测试ip').test_ip()
for d in data:
test_ip.append(d['ip'])
# 行为数据
where = {'start_time': DateHandler.date_time(1), 'end_time': DateHandler.date_time(0),
'condition': ['behavior_id > 0']}
ex_be = ExBehavior('读取行为日志')
be_data = ex_be.behavior_log(where)
# 转化
ts_be = TsBehavior(name='行为日志计算', data=be_data)
be_data_ts = ts_be.trans_sum_behavior(test_ip=test_ip)
# 订单数据
# PC
where = {'condition': ['cal_ts = \'' + DateHandler.now_date(1, 1) + '\'',
'order_type = 1']}
ex_order = ExDashboard('读取订单')
od_data = ex_order.sum_order(where)
be_data_ts['new_order_count']['pc'] = int(od_data[0]['sum(order_count)'])
be_data_ts['pay_order_count']['pc'] = int(od_data[0]['sum(order_paid_count)'])
# H5
where = {'condition': ['cal_ts = \'' + DateHandler.now_date(1, 1) + '\'',
'order_type = 2']}
ex_order = ExDashboard('读取订单')
od_data = ex_order.sum_order(where)
be_data_ts['new_order_count']['h5'] = int(od_data[0]['sum(order_count)'])
be_data_ts['pay_order_count']['h5'] = int(od_data[0]['sum(order_paid_count)'])
# 写入数据
col = ['search_count', 'custom_count', 'addcart_count', 'buy_count', 'confirm_count',
'pay_count', 'new_order_count', 'pay_order_count']
# PC
value = []
for c in col:
value.append(str(be_data_ts[c]['pc']))
col.append('insert_time')
col.append('cal_ts')
col.append('platform')
value.append(str(DateHandler.now_datetime()))
value.append('\'' + DateHandler.now_date(1, 1) + '\'')
value.append('1')
LoadMysql.keyword_load(col=col, table='lie_sum_behavior_cal', value=value, db=ConnList.Dashboard())
# H5
col = ['search_count', 'custom_count', 'addcart_count', 'buy_count', 'confirm_count',
'pay_count', 'new_order_count', 'pay_order_count']
value = []
for c in col:
value.append(str(be_data_ts[c]['h5']))
col.append('insert_time')
col.append('cal_ts')
col.append('platform')
value.append(str(DateHandler.now_datetime()))
value.append('\'' + DateHandler.now_date(1, 1) + '\'')
value.append('2')
LoadMysql.keyword_load(col=col, table='lie_sum_behavior_cal', value=value, db=ConnList.Dashboard())
@staticmethod
def pipeline_all_behavior():
where = {'start_time': 1533052800, 'end_time': 1548950400, 'condition': ['behavior_id > 0']}
ex_be = ExBehavior('行为转化')
data = ex_be.behavior_log(where)
print(len(data))
ts_be = TsBehavior(name="行为转化", data=data)
ts_be.trans_output_behavior()
@staticmethod
def pipeline_order_behavior():
where = {'start_time': 1541001600, 'end_time': 1543593600, 'condition': ['order_id > 0']}
ex_order = ExOrder('订单详情')
data = ex_order.all_order(where)
print(len(data))
ts_be = TsBehavior(name="行为转化", data=data)
ts_be.trans_order_behavior()
@staticmethod
def pipeline_reg_without_login():
where = {'start_time': 1, 'end_time': 1546444800,
'condition': ['AND user_id NOT IN (SELECT user_id FROM lie_user_login_log \
WHERE last_login_time BETWEEN 1538496000 AND 1546444800 GROUP BY user_id)']}
ex_user = ExUser('注册未登录用户')
data = ex_user.reg_user(where)
print(len(data))
ts_be = TsBehavior(name="会员行为", data=data)
ts_be.trans_user_exit_behavior('(1,2,3)')
@staticmethod
def pipeline_no_login():
ul = []
tl = []
fl = {}
title = ['账号', '时间']
content = ['mobile', 'create_time']
sql = "SELECT user_id FROM lie_user_main WHERE is_test = 1"
db = ConnList.Order()
test = DBHandler.read(db, sql)
for row in test:
tl.append(row[0])
db.close()
sql = "SELECT user_id FROM lie_behavior_log WHERE create_time BETWEEN %d AND %d AND behavior in (4, 11) GROUP BY user_id" % (0, 1554185690)
db = ConnList.Behavior()
behavior = DBHandler.read(db, sql)
for row in behavior:
if row[0] not in tl and row[0] not in ul:
ul.append(row[0])
db.close()
sql = "SELECT user_id FROM lie_order WHERE order_type = 1 AND is_type != 0 GROUP BY user_id"
db = ConnList.Order()
order = DBHandler.read(db, sql)
for row in order:
if row[0] not in tl and row[0] not in ul:
ul.append(row[0])
index = 0
for user_id in ul:
sql = "SELECT max(last_login_time),mobile,email,create_time FROM lie_user_login_log l LEFT JOIN lie_user_main u ON u.user_id = l.user_id WHERE l.user_id = %d" % user_id
result = DBHandler.read(db, sql)
if len(result) > 0 and result[0][0] is not None:
if 1554185690 - result[0][0] > 86400 * 60:
fl[index] = {'mobile': result[0][1] if result[0][1] != '' else result[0][2],
'create_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(result[0][3]))}
index += 1
print(index)
else:
print('None')
print(user_id)
break
ExcelHandler.write_to_excel_with_openpyxl(title, content, fl, "result.xls")
from utils.msg_handler import MsgHandler
from utils.date_handler import DateHandler
from extract.ex_dashboard import ExDashboard
class PiEmail:
"""
专卖下架邮件
"""
@staticmethod
def pipeline_zm_down():
# 获取数据
key = 'djdj93ichuntj56dksisearchdj45eieapi'
url = "http://so12.ichunt.com/search/ServerApi/encodedCount"
data = MsgHandler.get_encrypt_msg(key, url)
# 获取邮件列表
condition = {'condition': ['email_id = 5', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 数据格式转换
send_data = {'code': '', 'num': '', 'canal': '', 'yes_day': DateHandler.now_date(1, 1)}
for d in data:
send_data['code'] += d['encoded'] + '<br>'
send_data['num'] += d['num'] + '<br>'
send_data['canal'] += d['canal'] + '<br>'
# 发送邮件
MsgHandler.send_email(send_data, email_list, 'zm-down-daily')
"""
专卖提前下架预警
"""
@staticmethod
def pipeline_zm_pre_down(data):
# 获取邮件列表
condition = {'condition': ['email_id = 8', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 数据格式转换
send_data = {'code': '', 'num': '', 'canal': '', 'yes_day': DateHandler.now_date(1, 1)}
for d in data:
send_data['code'] += str(d['purchase_uid']) + '<br>'
send_data['num'] += str(d['count']) + '<br>'
send_data['canal'] += d['supplier_code'] + '<br>'
# 发送邮件
MsgHandler.send_email(send_data, email_list, 'zm-warn')
"""
库存预警
"""
@staticmethod
def pipeline_safe_stock(count):
# 获取邮件列表
condition = {'condition': ['email_id = 3', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 数据格式转换
info = '下载库存预警文件' if count > 0 else '无库存预警文件下载'
link = 'http://shuju.ichunt.net/analysis/stockDownload?' + DateHandler.now_date(0, 1) if count > 0 else ''
send_data = {'day': DateHandler.now_date(0, 1),
'num': count,
'info': info,
'link': link}
# 发送邮件
MsgHandler.send_email(send_data, email_list, 'stock-report')
"""
sku曝光
"""
@staticmethod
def pipeline_sku_expose(data):
# 获取邮件列表
condition = {'condition': ['email_id = 6', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 发送邮件
MsgHandler.send_email(data, email_list, 'sku-expose')
"""
搜索无结果
"""
@staticmethod
def pipeline_search_no_result(data, keyword):
# 获取邮件列表
condition = {'condition': ['email_id = 2', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 发送邮件
MsgHandler.send_email(data, email_list, keyword)
"""
核心日报
"""
@staticmethod
def pipeline_core_report(data, keyword):
# 获取邮件列表
condition = {'condition': ['email_id = 1', 'status = 1']}
email_list = ExDashboard('采集').email_list(condition)
# 发送邮件
MsgHandler.send_email(data, email_list, keyword)
@staticmethod
def pipeline_zyly_match(data):
# 获取邮件列表
condition = {'condition': ['email_id = 7', 'status = 1']}
email_list = ExDashboard('').email_list(condition)
# 发送邮件
MsgHandler.send_email(data, email_list, 'zyly_price_match')
@staticmethod
def pipeline_operate_daily():
# 获取邮件列表
condition = {'condition': ['email_id = 9', 'status = 1']}
email_list = ExDashboard('').email_list(condition)
# 数据格式转换
link = 'http://shuju.ichunt.net/api/ApiOperateDaily?day=' + DateHandler.now_date(0, 1)
send_data = {'day': DateHandler.now_date(0, 1),
'link': link}
# 发送邮件
MsgHandler.send_email(send_data, email_list, 'operate_daily')
@staticmethod
def pipeline_operate_weekly():
# 获取邮件列表
condition = {'condition': ['email_id = 10', 'status = 1']}
email_list = ExDashboard('').email_list(condition)
# 数据格式转换
link = 'http://shuju.ichunt.net/api/ApiOperateWeekly'
send_data = {'link': link}
# 发送邮件
MsgHandler.send_email(send_data, email_list, 'operate_weekly')
from extract.ex_ickey import ExIckey
from extract.ex_goods import ExGoods
from extract.ex_order import ExOrder
from extract.ex_dashboard import ExDashboard
from extract.ex_purchase import ExPurchase
from translate.ts_goods import TsGoods
from load.load_mysql import LoadMysql
from config.conn_list import ConnList
from utils.date_handler import DateHandler
from pipeline.pi_email import PiEmail
class PiGoods:
"""
自营猎芯联营SKU匹配
"""
@staticmethod
def pipeline_zylxly_goods():
where = {'condition': ['status = 1']}
ex_goods = ExGoods('自营数据')
zy_data = ex_goods.ex_zy_goods(where)
# 获取猎芯联营SKU_ID
data = ex_goods.ex_ly_goods()
# 获取猎芯联营型号
ts_goods = TsGoods('', data)
ts_data = ts_goods.trans_goods_name()
# 自营联营同一型号匹配
print(len(zy_data), len(ts_data))
final_result = ts_goods.trans_match_name(zy_data, ts_data, 6.88, 1.16)
match_result = ts_goods.trans_zy_low_price(final_result)
"""
自营备货
"""
@staticmethod
def pipeline_zy_stock():
# 获取云汉指数数据
ex_ickey = ExIckey('')
ickey = ex_ickey.ex_ickey_index_goods()
# 获取立创自营数据
ts_goods = TsGoods(name='', data=ickey)
ts_goods.trans_lc_data()
ts_goods.trans_lx_search()
# final_result = ts_goods.return_data()
# 写入DB
ts_goods.trans_load_gn()
"""
专卖下架提前预警
"""
@staticmethod
def pipeline_zm_warn():
# 获取联营供应商数据
where = {'condition': ['status = 2']}
ex_goods = ExGoods('')
sup = ex_goods.ex_ly_supplier(where)
# 数据处理
ts_goods = TsGoods('', data=sup)
ts_sup = ts_goods.trans_zm_warn()
# 发送邮件
PiEmail.pipeline_zm_pre_down(ts_sup)
"""
处理指定人员下的联营渠道数据
参数:
10034 陈泽彬
10044 庞力
10076 许金荣
"""
@staticmethod
def pipeline_special_canal():
guy = [[10034, '陈泽彬'], [10044, '庞力'], [10076, '许金荣']]
guy_str = '('
# name_str = '('
for i in range(0, len(guy)):
if i != len(guy) - 1:
guy_str += str(guy[i][0]) + ', '
# name_str += '\'' + str(guy[i][1]) + '\'' + ', '
else:
guy_str += str(guy[i][0]) + ')'
# name_str += '\'' + str(guy[i][1]) + '\'' + ')'
where = {'condition': ['status = 2', 'channel_uid in %s' % guy_str]}
erp_where = {'start_time': DateHandler.date_time(1),
'end_time': DateHandler.date_time(0),
# 'condition': ['sale_man in %s' % name_str]
'condition': ['source_type = \'ERP\'']}
sup = ExGoods('').ex_ly_supplier(where)
erp = ExGoods('').ex_erp_goods(erp_where)
rd = TsGoods('', sup).trans_special_canal(erp)
col = ['supplier_code', 'supplier_name', 'effect_num', 'order_num', 'cal_ts', 'insert_time']
LoadMysql.sample_load(col, 'lie_special_order_cal', rd, db=ConnList.Dashboard(), cal_time=False)
"""
自营每日汇总
汇总字段
品牌
一级类目
"""
@staticmethod
def pipeline_zy_daily_summary():
# 自营曝光数据
zy_ep = ExDashboard('').zy_goods({'start_time': DateHandler.date_time(1)})
o_w = {'start_time': DateHandler.date_time(1),
'end_time': DateHandler.date_time(0),
'condition': ['o.order_goods_type = 2']}
# o_w = {'start_time': DateHandler.date_time(27),
# 'end_time': DateHandler.date_time(-1),
# 'condition': ['o.order_goods_type = 2']}
# 订单数据
order = ExOrder('').order_items(o_w)
o_w['condition'].append('o.status > 2')
p_order = ExOrder('').order_paid_items(o_w)
# 采购数据
pur = ExPurchase('').purchase_items({'start_time': DateHandler.date_time(1),
'end_time': DateHandler.date_time(0),
'condition': ['p.status > 1']})
# 曝光计算
rd = TsGoods('', zy_ep).trans_zy_expose_type()
# 订单计算
TsGoods('', order).trans_zy_order_type(rd)
TsGoods('', p_order).trans_zy_paid_order_type(rd)
# 采购计算
TsGoods('', pur).trans_zy_purchase(rd)
# 写入DB
col = ['expose', 'order_count', 'paid_order', 'cost', 'pur_cost', 'name_type', 'name', 'cal_ts']
for r in ['brand', 'class']:
result = []
for d in rd[r]:
rd[r][d]['name_type'] = 1 if r == 'brand' else 2
rd[r][d]['name'] = d
rd[r][d]['cal_ts'] = DateHandler.now_date(1, 1)
# rd[r][d]['cal_ts'] = DateHandler.now_date(0, 1)
result.append(rd[r][d])
LoadMysql.simple_dict_load(col,
'lie_zy_summary',
result,
db=ConnList.Dashboard(),
cal_time=False)
import subprocess
import time
import re
from utils.date_handler import DateHandler
from utils.db_handler import DBHandler
from config.conn_list import ConnList
class PiLxLog:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = ConnList.Dashboard()
self.page_count = {}
self.status_count = {}
self.ip_section = {}
self.day = DateHandler.now_date(1, 1)
self.mon = {'JAN': '01', 'FEB': '02', 'MAR': '03', 'APR': '04', 'MAY': '05', 'JUN': '06', 'JUL': '07', 'AUG': '08', 'SEP': '09', 'OCT': '10', 'NOV': '11', 'DEC': '12'}
self.match = {
'zy': '((.*)/item/(.*)|(.*)/xianhuo(.*)|(.*)/product(.*))',
'ly': '((.*)goods_(.*)|(.*)/spu/(.*)|(.*)/brand/list/(.*))',
'search': '((.*)/s/(.*))',
'ic': '((.*)/ic/(.*))',
'info': '((.*)news.html(.*)|(.*)/article/(.*))'
}
self.block_ip = ['39.156.65', '47.92.125', '14.215.176']
def merge_file(self):
# HDFS数据写入本地
day = self.day
hdfs_file = '/lx_log/lx_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
des_file = ' /data3/hdfs_data/lx_log'
cmd = 'hadoop dfs -get ' + hdfs_file + des_file
subprocess.getoutput(cmd)
# 合并小文件
cmd = 'find' + des_file + ' -type f -exec cat {} \; >' + des_file + '/sum'
subprocess.getoutput(cmd)
# 文件回传HDFS
cmd = 'hadoop dfs -put /data3/hdfs_data/lx_log/sum ' + '/lx_log/lx_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2]
subprocess.getoutput(cmd)
# 删除本地文件
cmd = 'rm -f /data3/hdfs_data/lx_log/*'
subprocess.getoutput(cmd)
# 删除HDFS小文件
hdfs_file = '/lx_log/lx_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
cmd = 'hadoop dfs -rm -r ' + hdfs_file
subprocess.getoutput(cmd)
def merge_ly_file(self):
# HDFS数据写入本地
day = self.day
hdfs_file = '/ly_log/ly_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
des_file = ' /data3/hdfs_data/ly_log'
cmd = 'hadoop dfs -get ' + hdfs_file + des_file
subprocess.getoutput(cmd)
# 合并小文件
cmd = 'find' + des_file + ' -type f -exec cat {} \; >' + des_file + '/sum'
subprocess.getoutput(cmd)
# 文件回传HDFS
cmd = 'hadoop dfs -put /data3/hdfs_data/ly_log/sum ' + '/ly_log/ly_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2]
subprocess.getoutput(cmd)
# 删除本地文件
cmd = 'rm -f /data3/hdfs_data/ly_log/*'
subprocess.getoutput(cmd)
# 删除HDFS小文件
hdfs_file = '/ly_log/ly_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + '/small/*'
cmd = 'hadoop dfs -rm -r ' + hdfs_file
subprocess.getoutput(cmd)
def get_ly_hdfs_data(self):
# 读取HDFS数据
day = self.day
hdfs_file = '/ly_log/ly_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + "/sum"
hdfs_data = DBHandler.hdfs_read(hdfs_file)
return hdfs_data
def get_hdfs_data(self):
# 读取HDFS数据
day = self.day
hdfs_file = '/lx_log/lx_log_' + day.split('-')[0][2:4] + day.split('-')[1] + day.split('-')[2] + "/sum"
hdfs_data = DBHandler.hdfs_read(hdfs_file)
return hdfs_data
def wash_data(self, data):
f = open('/data3/hdfs_data/spider_url/url.txt', 'w')
for row in data:
try:
row = row.decode("utf-8")
if 'Baiduspider' in row:
arr = row.split(" ")
ip = arr[0]
time_base = arr[3]
url = arr[6]
status = int(arr[8])
# 屏蔽IP
for block_ip in self.block_ip:
if block_ip in ip:
continue
time_str = self.time_analyzing(time_base)
time_ts = DateHandler.str_to_unix(time_str)
page = self.url_analyzing(url)
self.ip_section_analyzing(ip)
if page not in self.page_count:
self.page_count[page] = 1
else:
self.page_count[page] += 1
if status not in self.status_count:
self.status_count[status] = 1
else:
self.status_count[status] += 1
if status >= 400:
self.insert_seo_url(url, status, time_ts, 'lie_seo_spider_url')
else:
line = url + "|" + str(time_ts) + "|" + ip
f.write(line + "\n")
f.flush()
except:
pass
self.insert_seo_status(self.status_count, 'lie_seo_spider_status')
self.insert_seo_daily(self.page_count)
self.insert_seo_ip_section(self.ip_section)
f.close()
def wash_ly_data(self, data):
for row in data:
try:
row = row.decode("utf-8")
arr = row.split(" ")
url = arr[6]
status = int(arr[8])
time_base = arr[3]
time_str = self.time_analyzing(time_base)
time_ts = DateHandler.str_to_unix(time_str)
if '.html' in url or '/' == url:
if status not in self.status_count:
self.status_count[status] = 1
else:
self.status_count[status] += 1
if status >= 400:
self.insert_seo_url(url, status, time_ts, 'lie_seo_ly_url')
except:
pass
self.insert_seo_status(self.status_count, 'lie_seo_ly_status')
# 时间分析
def time_analyzing(self, time_str):
# example: 29/Aug/2019:15:38:00
time_str = time_str[1:]
split_1 = time_str.split("/")
split_2 = split_1[2].split(":")
day = split_1[0]
mon = self.mon[split_1[1].upper()]
year = split_2[0]
hour = split_2[1]
minute = split_2[2]
sec = split_2[3]
return (year + '-' + mon + '-' + day + ' ' + hour + ':' + minute + ':' + sec)
# URL分析
def url_analyzing(self, url):
zy = self.match['zy']
ly = self.match['ly']
search = self.match['search']
ic = self.match['ic']
info = self.match['info']
if re.match(zy, url, re.M | re.I):
return '自营'
elif re.match(ly, url, re.M | re.I):
return '联营'
elif re.match(search, url, re.M | re.I):
return '搜索'
elif re.match(ic, url, re.M | re.I):
return 'IC'
elif re.match(info, url, re.M | re.I):
return '资讯'
else:
return '其他'
# IP段累加统计
def ip_section_analyzing(self, ip):
if re.match('([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)', ip, re.M | re.I):
ip_arr = ip.split('.')
new_ip = '%s.%s.%s' % (ip_arr[0], ip_arr[1], ip_arr[2])
if new_ip not in self.ip_section:
self.ip_section[new_ip] = 1
else:
self.ip_section[new_ip] += 1
def pipeline_lx_log(self):
self.merge_file()
data = self.get_hdfs_data()
self.wash_data(data)
def pipeline_ly_log(self):
self.merge_ly_file()
data = self.get_ly_hdfs_data()
self.wash_ly_data(data)
def insert_seo_url(self, url, status, time_ts, table):
sql = "INSERT INTO %s (url,status,cal_ts,spider_time) VALUES (\'%s\',\'%s\',\'%s\',%d)" % (table,url,status,self.day,time_ts)
DBHandler.insert(self.db, sql)
def insert_seo_status(self, status_count, table):
for status in status_count:
count = status_count[status]
sql = "INSERT INTO %s (status,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (table, status,count,self.day)
DBHandler.insert(self.db, sql)
def insert_seo_daily(self, page_count):
for page in page_count:
count = page_count[page]
sql = "INSERT INTO lie_seo_spider_daily (page,count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (page,count,self.day)
DBHandler.insert(self.db, sql)
def insert_seo_ip_section(self, ip_section):
for ip in ip_section:
count = ip_section[ip]
sql = "INSERT INTO lie_seo_spider_ip_section (ip_section,spider_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
import re
import subprocess
import requests
import json
import time
import multiprocessing as mp
from config.conn_list import ConnList
from utils.db_handler import DBHandler
from utils.date_handler import DateHandler
from concurrent.futures import ThreadPoolExecutor
db = ConnList.Dashboard()
def deal_data(url, tid, dur, table):
time.sleep(dur)
r = requests.get(url)
res = r.json()
included_status = res['has_included']
included_time = res['included_time']
now_ts = DateHandler.now_datetime()
if included_status == 1:
if re.match(('([0-9]+)年([0-9]+)月([0-9]+)日 ([0-9]+):([0-9]+):([0-9]+)'), included_time, re.M | re.I):
included_time = DateHandler.str_to_unix(included_time, "%Y年%m月%d日 %H:%M:%S")
sql = "UPDATE %s SET is_include=%d,include_time=%d,update_time=%d WHERE id=%d" % (table,included_status,included_time,now_ts,tid)
else:
included_time = 0
sql = "UPDATE %s SET is_include=%d,include_time=%d,update_time=%d WHERE id=%d" % (table,included_status,included_time,now_ts,tid)
else:
included_time = 0
sql = "UPDATE %s SET is_include=%d,include_time=%d,update_time=%d WHERE id=%d" % (table,included_status,included_time,now_ts,tid)
DBHandler.update(db, sql)
class PiLxLogUrl:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = ConnList.Dashboard()
self.day = DateHandler.now_date(1, 1)
self.ip_section = {}
self.post_url = "http://127.0.0.1:12588/seocheck"
self.executor = ThreadPoolExecutor(max_workers=20)
def del_file(self):
cmd = "rm -f /data3/hdfs_data/spider_url/url.txt"
subprocess.getoutput(cmd)
def read_file(self):
now_ts = DateHandler.now_datetime()
with open('/data3/hdfs_data/spider_url/url.txt', 'r', encoding='utf-8') as f:
line = f.readline()
while line:
try:
arr = line.split('|')
url = self.reg_url(arr[0])
time = int(arr[1])
ip = arr[2]
line = f.readline()
sql = "INSERT INTO lie_seo_bd_record (url,spider_time,is_include,include_time,update_time,ip) VALUES (\'%s\',%d,%d,%d,%d,\'%s\')" % (url,time,0,0,now_ts,ip)
DBHandler.insert(self.db, sql)
except:
pass
def reg_url(self, url):
if ('?' in url):
return url.split('?')[0]
def read_data(self):
sql = "SELECT id,url,ip,is_include FROM lie_seo_bd_record"
data = DBHandler.read(self.db, sql)
return data
def include_baidu(self, data):
p1 = mp.Pool(processes=10)
for row in data:
tid = row[0]
url = row[1]
url = "http://127.0.0.1:12588/seocheck?target=www.ichunt.com" + url
p1.apply_async(deal_data, (url, tid, 0.1, 'lie_seo_bd_record'))
p1.close()
p1.join()
# 收录分析
def include_analyzing(self, data):
for row in data:
ip = row[2]
is_include = row[3]
if (re.match('([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)', ip, re.M | re.I)) and (is_include == 1):
ip_arr = ip.split('.')
new_ip = '%s.%s.%s' % (ip_arr[0], ip_arr[1], ip_arr[2])
if new_ip not in self.ip_section:
self.ip_section[new_ip] = 1
else:
self.ip_section[new_ip] += 1
# 指定页面收录分析
def include_appoint_analyzing(self):
sql = "SELECT id,url FROM lie_seo_appoint_record"
result = DBHandler.read(self.db, sql)
for row in result:
tid = row[0]
url = "http://127.0.0.1:12588/seocheck?target=" + row[1]
deal_data(url, tid, 0.1, 'lie_seo_appoint_record')
# 收录插入
def insert_seo_ip_section(self):
for ip in self.ip_section:
count = self.ip_section[ip]
sql = "INSERT INTO lie_seo_include_ip_section (ip_section,include_count,cal_ts) VALUES (\'%s\',\'%s\',\'%s\')" % (ip,count,self.day)
DBHandler.insert(self.db, sql)
def pipeline_lx_log_url(self):
self.read_file()
data = self.read_data()
self.include_baidu(data)
self.include_analyzing(data)
self.include_appoint_analyzing()
self.insert_seo_ip_section()
self.del_file()
from extract.ex_market import ExMarket
from translate.ts_base import TsBase
from translate.ts_order import TsOrder
class PiMarket:
@staticmethod
def pipeline_lottery():
# ---库存数据---
where = {'condition': ['lottery_id = 61']}
ex_l = ExMarket('抽奖数据')
lottery_data = ex_l.ex_prize_log(where)
# 转换
ts = TsBase('抽奖转换', lottery_data)
print('抽奖人数', ts.no_repeat_people())
print('抽奖次数', ts.return_times())
@staticmethod
def pipeline_coupon_order():
# ---库存数据---
where = {'condition': ['c.coupon_id in (243,244,245)', 'o.order_amount is not NULL']}
ex_l = ExMarket('抽奖数据')
coupon = ex_l.ex_user_coupon(where)
print(coupon)
ts = TsOrder('下单概览转换', coupon)
print(ts.return_times(), ts.trans_order_target(6.9))
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment