Commit 595562b0 by 刘豪

更改ts_crm逻辑

parent 197615c8
...@@ -26,7 +26,7 @@ if __name__ == '__main__': ...@@ -26,7 +26,7 @@ if __name__ == '__main__':
PiGoods.pipeline_zm_warn() PiGoods.pipeline_zm_warn()
if sys.argv[1] == 'safe_stock': if sys.argv[1] == 'safe_stock':
PiPurchase.pipeline_safe_stock() PiPurchase.pipeline_safe_stock()
if sys.argv[1] == 'search_no_r': if sys.argv[1] == 'search_no_r': # 每日搜索关键词数据
PiSearch.search_no_result() PiSearch.search_no_result()
if sys.argv[1] == 'sku_expose': if sys.argv[1] == 'sku_expose':
# 获取HDFS数据 # 获取HDFS数据
......
...@@ -87,8 +87,7 @@ class ConnList(Base): ...@@ -87,8 +87,7 @@ class ConnList(Base):
@staticmethod @staticmethod
def Credit(): def Credit():
conf = credit_server conf = credit_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'], return pymysql.connect(**conf)
conf['db_name'], charset='utf8')
@staticmethod @staticmethod
def CreditSource(): def CreditSource():
...@@ -176,8 +175,7 @@ class ConnList(Base): ...@@ -176,8 +175,7 @@ class ConnList(Base):
@staticmethod @staticmethod
def WrCrm(): def WrCrm():
conf = wr_crm_server conf = wr_crm_server
return pymysql.connect(str(conf['host']), conf['user'], conf['password'], return pymysql.connect(**conf)
conf['db_name'], charset='utf8')
@staticmethod @staticmethod
def LxDb(): def LxDb():
......
...@@ -166,14 +166,16 @@ wr_crm_server = { ...@@ -166,14 +166,16 @@ wr_crm_server = {
'host': '172.18.137.21', 'host': '172.18.137.21',
'user': 'LxCrmUser', 'user': 'LxCrmUser',
'password': 'xUTmu0XsdUqoZIim2y', 'password': 'xUTmu0XsdUqoZIim2y',
'db_name': 'liexin_crm' 'db': 'liexin_crm',
'charset': 'utf8'
} }
credit_server = { credit_server = {
'host': 'fkdb-master.ichunt.cc', 'host': 'fkdb-master.ichunt.cc',
'user': 'Cdimz200o', 'user': 'Cdimz200o',
'password': 'mLssyDxmsySZmBomy', 'password': 'mLssyDxmsySZmBomy',
'db_name': 'liexin_credit' 'db': 'liexin_credit',
'charset': 'utf8'
} }
credit_source_server = { credit_source_server = {
......
...@@ -582,24 +582,12 @@ class PiDaily: ...@@ -582,24 +582,12 @@ class PiDaily:
start_time = 0 start_time = 0
end_time = DateHandler.now_datetime() end_time = DateHandler.now_datetime()
# db = ConnList.Order()
lx_order_db = ConnList.LxOrderDb() lx_order_db = ConnList.LxOrderDb()
wr_db = ConnList.WrCrm() wr_db = ConnList.WrCrm()
user = ExCrm('').crm_user(start_time, end_time) user = ExCrm('').crm_user(start_time, end_time)
ts_user = TsCrm(lx_order_db, wr_db, user) ts_user = TsCrm(lx_order_db, wr_db, user)
print(len(user)) ts_user.handle_data()
ts_user.is_order()
ts_user.last_order()
ts_user.order_num()
ts_user.model_num()
ts_user.order_amount()
ts_user.paid_amount()
ts_user.contact_info()
ts_user.iap_data()
print(len(user))
import traceback import traceback
from utils.db_handler import DBHandler import pymysql
# from utils.db_handler import DBHandler
from utils.date_handler import DateHandler from utils.date_handler import DateHandler
from extract.ex_erp import ExERP from extract.ex_erp import ExERP
...@@ -10,153 +12,210 @@ class TsCrm: ...@@ -10,153 +12,210 @@ class TsCrm:
def __init__(self, db, wr_db, data): def __init__(self, db, wr_db, data):
self.dd = {} self.dd = {}
self.db = db self.db = db
self.cursor = self.db.cursor(cursor=pymysql.cursors.DictCursor)
self.wr_db = wr_db self.wr_db = wr_db
self.wr_db_cursor = self.wr_db.cursor()
self.data = data self.data = data
self.exchange = ExERP(DateHandler.now_date(0, 1)).get_erp_exchange() self.exchange = ExERP(DateHandler.now_date(0, 1)).get_erp_exchange()
self.init()
def init(self): def __del__(self):
for row in self.data: if self.cursor:
outter_uid = row[0] self.cursor.close()
user_id = row[1] if self.wr_db_cursor:
self.wr_db_cursor.close()
def read(self, cursor, db, sql):
results = {}
try:
cursor.execute(sql)
results = cursor.fetchall()
except:
db.rollback()
return results
def update(self, cursor, db, sql):
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
traceback.print_exc()
def insert(self, cursor, db, sql):
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
traceback.print_exc()
def delete(self, cursor, db, sql):
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
traceback.print_exc()
def handle_data(self):
num = 100
data = [self.data[i:i + num] for i in range(0, len(self.data), num)]
for row in data:
outter_uids = []
str_outter_uids_list = []
for i in row:
outter_uid = i[0]
user_id = i[1]
self.dd[outter_uid] = { self.dd[outter_uid] = {
'user_id': user_id, 'user_id': user_id,
'outter_uid': outter_uid,
'is_order': 0, 'is_order': 0,
'last_order_time': 0, 'last_order_time': 0,
'order_num': 0, 'order_num': 0,
'model_num': 0, 'model_num': 0,
'order_amount': 0, 'order_amount': float(0),
'paid_amount': 0, 'paid_amount': 0,
'contact': '', 'contact': '',
'contact_info': '' 'contact_info': ''
} }
outter_uids.append(outter_uid)
def is_order(self): str_outter_uids_list.append(str(outter_uid))
for outter_uid in self.dd: str_outter_uids = ",".join(str_outter_uids_list)
try:
sql = "SELECT 1 FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1" % outter_uid for outter_uid in outter_uids:
order = DBHandler.read(self.db, sql) user_ids = self.is_order(str_outter_uids)
if (len(order) > 0): if outter_uid in user_ids:
self.dd[outter_uid]['is_order'] = 2 self.dd[outter_uid]["is_order"] = 2
else: else:
self.dd[outter_uid]['is_order'] = 1 self.dd[outter_uid]['is_order'] = 1
except:
self.dd[outter_uid]['is_order'] = 0
def last_order(self):
for outter_uid in self.dd:
try:
sql = "SELECT create_time FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1 ORDER BY order_id DESC LIMIT 1" % outter_uid
order = DBHandler.read(self.db, sql)
last_order_time = order[0][0]
self.dd[outter_uid]['last_order_time'] = last_order_time
except:
self.dd[outter_uid]['last_order_time'] = 0
def order_num(self):
for outter_uid in self.dd:
try:
sql = "SELECT count(order_id) FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1 AND status = 10" % outter_uid sql = "SELECT count(order_id) FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1 AND status = 10" % outter_uid
order = DBHandler.read(self.db, sql) order = self.read(self.cursor, self.db, sql)
order_num = order[0][0] self.dd[outter_uid]['order_num'] = order[0]["count(order_id)"]
self.dd[outter_uid]['order_num'] = order_num
except:
self.dd[outter_uid]['order_num'] = 0
def model_num(self):
for outter_uid in self.dd:
try:
sql = "SELECT count(i.goods_id) FROM lie_order_items i LEFT JOIN lie_order o ON i.order_id = o.order_id WHERE o.user_id = %d AND i.status = 1 AND o.is_type = 0 AND o.order_type = 1 AND o.status = 10" % outter_uid sql = "SELECT count(i.goods_id) FROM lie_order_items i LEFT JOIN lie_order o ON i.order_id = o.order_id WHERE o.user_id = %d AND i.status = 1 AND o.is_type = 0 AND o.order_type = 1 AND o.status = 10" % outter_uid
model = DBHandler.read(self.db, sql) model = self.read(self.cursor, self.db, sql)
model_num = model[0][0] self.dd[outter_uid]['model_num'] = model[0]["count(i.goods_id)"]
self.dd[outter_uid]['model_num'] = model_num self.last_order(str_outter_uids)
except: self.order_amount(str_outter_uids)
self.dd[outter_uid]['model_num'] = 0 self.paid_amount(str_outter_uids)
self.contact_info(str_outter_uids)
def order_amount(self): self.iap_data(outter_uids)
for outter_uid in self.dd:
def is_order(self, str_outter_uids):
user_ids = []
try:
# sql = "SELECT 1 FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1" % outter_uid
sql = "SELECT user_id FROM lie_order WHERE user_id in (%s) AND is_type = 0 AND order_type = 1" % str_outter_uids
order = self.read(self.cursor, self.db, sql)
user_ids = list(set([o["user_id"] for o in order]))
except Exception as e:
print(e)
return user_ids
def last_order(self, str_outter_uids):
try: try:
sum_amount = 0 # sql = "SELECT create_time FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1 ORDER BY order_id DESC LIMIT 1" % outter_uid
sql = "SELECT order_amount,currency FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1" % outter_uid sql = "SELECT user_id, create_time FROM lie_order WHERE user_id in (%s) AND is_type = 0 AND order_type = 1 ORDER BY order_id DESC" % str_outter_uids
order = DBHandler.read(self.db, sql) order = self.read(self.cursor, self.db, sql)
for od in order: for o in order:
amount = float(od[0]) if o["create_time"] > self.dd[o["user_id"]]['last_order_time']:
currency = od[1] self.dd[o["user_id"]]['last_order_time'] = o["create_time"]
except Exception as e:
print(e)
#
#
# def order_num(self, outter_uids):
# for outter_uid in outter_uids:
# try:
# sql = "SELECT count(order_id) FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1 AND status = 10" % outter_uid
# order = self.read(self.cursor, self.db, sql)
# self.dd[outter_uid]['order_num'] = order[0]["count(order_id)"]
# except Exception as e:
# print(e)
# def model_num(self):
# for outter_uid in self.dd:
# try:
# sql = "SELECT count(i.goods_id) FROM lie_order_items i LEFT JOIN lie_order o ON i.order_id = o.order_id WHERE o.user_id = %d AND i.status = 1 AND o.is_type = 0 AND o.order_type = 1 AND o.status = 10" % outter_uid
# model = DBHandler.read(self.db, sql)
# model_num = model[0][0]
# self.dd[outter_uid]['model_num'] = model_num
# except:
# self.dd[outter_uid]['model_num'] = 0
def order_amount(self, str_outter_uids):
try:
# sql = "SELECT order_amount,currency FROM lie_order WHERE user_id = %d AND is_type = 0 AND order_type = 1" % outter_uid
sql = "SELECT user_id, order_amount,currency FROM lie_order WHERE user_id in (%s) AND is_type = 0 AND order_type = 1" % str_outter_uids
order = self.read(self.cursor, self.db, sql)
for o in order:
amount = float(o["order_amount"])
currency = o["currency"]
if currency == 2: if currency == 2:
amount = self.exchange * amount amount = self.exchange * amount
sum_amount += amount self.dd[o["user_id"]]['order_amount'] += amount
self.dd[outter_uid]['order_amount'] = sum_amount except Exception as e:
except: print(e)
self.dd[outter_uid]['order_amount'] = 0
def paid_amount(self): def paid_amount(self, str_outter_uids):
for outter_uid in self.dd:
try: try:
sum_amount = 0 # sql = "SELECT p.pay_amount,o.currency FROM lie_order o LEFT JOIN lie_pay_log p ON o.order_id = p.order_id WHERE o.is_type = 0 AND o.order_type = 1 AND o.status > 2 AND p.is_paid = 1 AND o.user_id = %d" % outter_uid
sql = "SELECT p.pay_amount,o.currency FROM lie_order o LEFT JOIN lie_pay_log p ON o.order_id = p.order_id WHERE o.is_type = 0 AND o.order_type = 1 AND o.status > 2 AND p.is_paid = 1 AND o.user_id = %d" % outter_uid sql = "SELECT p.user_id, p.pay_amount,o.currency FROM lie_order o LEFT JOIN lie_pay_log p ON o.order_id = p.order_id WHERE o.is_type = 0 AND o.order_type = 1 AND o.status > 2 AND p.is_paid = 1 AND o.user_id in (%s)" % str_outter_uids
order = DBHandler.read(self.db, sql) order = self.read(self.cursor, self.db, sql)
for od in order: for o in order:
amount = float(od[0]) amount = float(o["pay_amount"])
currency = od[1] currency = o["currency"]
if currency == 2: if currency == 2:
amount = self.exchange * amount amount = self.exchange * amount
sum_amount += amount self.dd[o["user_id"]]['paid_amount'] += amount
self.dd[outter_uid]['paid_amount'] = sum_amount except Exception as e:
except: print(e)
self.dd[outter_uid]['paid_amount'] = 0
def contact_info(self): def contact_info(self, str_outter_uids):
for outter_uid in self.dd:
try: try:
sql = "SELECT consignee,mobile FROM lie_order_address a LEFT JOIN lie_order o ON o.order_id = a.order_id WHERE o.user_id = %d AND o.is_type = 0 AND o.order_type = 1 ORDER BY o.order_id DESC LIMIT 1" % outter_uid sql = "SELECT user_id,consignee,mobile,o.order_id FROM lie_order_address a LEFT JOIN lie_order o ON o.order_id = a.order_id WHERE o.user_id in (%s) AND o.is_type = 0 AND o.order_type = 1 ORDER BY o.order_id DESC" % str_outter_uids
contact = DBHandler.read(self.db, sql) contact = self.read(self.cursor, self.db, sql)
consignee = contact[0][0] data ={}
mobile = contact[0][1] for c in contact:
self.dd[outter_uid]['contact'] = consignee if data.get(c["user_id"]) is None:
self.dd[outter_uid]['contact_info'] = mobile data[c["user_id"]] = 0
except: if c["order_id"] > data[c["user_id"]]:
self.dd[outter_uid]['contact'] = '' data[c["user_id"]] = c["order_id"]
self.dd[outter_uid]['contact_info'] = '' self.dd[c["user_id"]]['contact'] = c["consignee"]
self.dd[c["user_id"]]['contact_info'] = c["mobile"]
def iap_data(self): except Exception as e:
print(e)
def iap_data(self, outter_uids):
create_time = DateHandler.date_time(0) create_time = DateHandler.date_time(0)
for outter_uid in self.dd: for outter_uid in outter_uids:
try: try:
dd = self.dd[outter_uid] self.dd[outter_uid]['create_time'] = create_time
dd['create_time'] = create_time
dd['outter_uid'] = outter_uid
sql = "SELECT 1 FROM lie_user_extend WHERE outter_uid = %d" % outter_uid sql = "SELECT 1 FROM lie_user_extend WHERE outter_uid = %d" % outter_uid
is_exist = DBHandler.read(self.wr_db, sql) is_exist = self.read(self.wr_db_cursor, self.wr_db, sql)
if (len(is_exist) > 0): if len(is_exist) > 0:
self.update_data(dd) self.update_data(self.dd[outter_uid])
else: else:
self.insert_data(dd) self.insert_data(self.dd[outter_uid])
if (dd['is_order'] == 2):
self.update_isValue(dd['user_id'])
if self.dd[outter_uid]['is_order'] == 2:
self.update_isValue(self.dd[outter_uid]['user_id'])
except: except:
traceback.print_exc() traceback.print_exc()
def insert_data(self, dd): def insert_data(self, dd):
sql = "INSERT INTO lie_user_extend (user_id,outter_uid,no_create_order,latest_order_time,completed_order_nums,model_nums,total_order_amount,amount_paid,contact,contact_info,create_time) \ sql = "INSERT INTO lie_user_extend (user_id,outter_uid,no_create_order,latest_order_time,completed_order_nums,model_nums,total_order_amount,amount_paid,contact,contact_info,create_time) \
VALUES (%d, %d, %d, %d, %d, %d, %.2f, %.2f, \'%s\',\'%s\', %d)" % \ VALUES (%d, %d, %d, %d, %d, %d, %.2f, %.2f, \'%s\',\'%s\', %d)" % \
(dd['user_id'], dd['outter_uid'], dd['is_order'], dd['last_order_time'], dd['order_num'], dd['model_num'], dd['order_amount'], dd['paid_amount'], dd['contact'], dd['contact_info'], dd['create_time']) (dd['user_id'], dd['outter_uid'], dd['is_order'], dd['last_order_time'], dd['order_num'], dd['model_num'],
DBHandler.insert(self.wr_db, sql) dd['order_amount'], dd['paid_amount'], dd['contact'], dd['contact_info'], dd['create_time'])
self.insert(self.wr_db_cursor, self.wr_db, sql)
def update_data(self, dd): def update_data(self, dd):
sql = "UPDATE lie_user_extend SET no_create_order=%d,latest_order_time=%d,completed_order_nums=%d,model_nums=%d,total_order_amount=%.2f,amount_paid=%.2f,contact=\'%s\',contact_info=\'%s\' WHERE outter_uid=%d" % \ sql = "UPDATE lie_user_extend SET no_create_order=%d,latest_order_time=%d,completed_order_nums=%d,model_nums=%d,total_order_amount=%.2f,amount_paid=%.2f,contact=\'%s\',contact_info=\'%s\' WHERE outter_uid=%d" % \
(dd['is_order'], dd['last_order_time'], dd['order_num'], dd['model_num'], dd['order_amount'], dd['paid_amount'], dd['contact'], dd['contact_info'], dd['outter_uid']) (dd['is_order'], dd['last_order_time'], dd['order_num'], dd['model_num'], dd['order_amount'],
DBHandler.update(self.wr_db, sql) dd['paid_amount'], dd['contact'], dd['contact_info'], dd['outter_uid'])
self.update(self.wr_db_cursor,self.wr_db, sql)
def update_isValue(self, user_id): def update_isValue(self, user_id):
sql = "UPDATE lie_user SET is_value=1 WHERE user_id=%d" % user_id sql = "UPDATE lie_user SET is_value=1 WHERE user_id=%d" % user_id
DBHandler.update(self.wr_db, sql) self.update(self.wr_db_cursor,self.wr_db, sql)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment