""" 拉取及更新pop店铺最新数据。 """ from common import request_to_jd_pop import pymysql import time import sys import json """ 关键问题记录,有课呢个 """ config_dev = { "mysql": { "host": "192.168.1.238", "port": 3306, "user": "liexin_data_distribution", "password": "liexin_data_distribution#zsyM", "database": "liexin_data_distribution" }, "redis": { "host": "192.168.1.235", "port": 6379, "password": "icDb29mLy2s" } } config_prod = { "mysql": { "host": "bigdata2.ichunt.db", "port": 3306, "user": "DsbuUx", "password": "sfj09JjsfpQx0", "database": "liexin_data_distribution" }, "redis": { "host": "172.18.137.38", "port": 6379, "password": "icDb29mLy2s" } } def update_pop_class(conf: dict): """ 更新pop得分类 """ method_name = "jingdong.vender.category.getFullValidCategoryResultByVenderId" token = "74ee3f2fa54d489da0d1b94ca83c9bc7lztn" app_key = "CCE6563827D13EDF4769C01E7429378D" app_secret = "98c72374c5b045529879786fc4e5c20e" param = { } ans = request_to_jd_pop(method_name, token, app_key, app_secret, param) data = ans["jingdong_vender_category_getFullValidCategoryResultByVenderId_responce"]["returnType"]["list"] db = pymysql.connect(host=conf["mysql"]["host"], port=conf["mysql"]["port"], user=conf["mysql"]["user"], database=conf["mysql"]["database"], password=conf["mysql"]["password"]) cursor = db.cursor() for i in range(len(data)): parent_id = data[i]["fid"] class_id = data[i]["id"] class_name = data[i]["name"] depth = data[i]["lev"] status = 1 platform = 3 create_time = int(time.time()) select_sql = "select id from lie_shop_class where platform = 3 and class_id = %s" cursor.execute(query=select_sql, args=(class_id,)) db_result = cursor.fetchall() if len(db_result) == 0: sql = "insert into lie_shop_class (parent_id, class_id, class_name, depth, status, platform, create_time) values (%s, %s, %s, %s, %s, %s, %s)" cursor.execute(query=sql, args=(parent_id, class_id, class_name, depth, status, platform, create_time)) db.commit() print(sql % (parent_id, class_id, class_name, depth, status, platform, create_time)) def update_pop_attr(conf: dict): select_sql = "select class_id from lie_shop_class where platform = 3" db = pymysql.connect(host=conf["mysql"]["host"], port=conf["mysql"]["port"], user=conf["mysql"]["user"], database=conf["mysql"]["database"], password=conf["mysql"]["password"]) cursor = db.cursor() cursor.execute(query=select_sql) db_result = cursor.fetchall() for i in range(len(db_result)): class_id = db_result[i][0] method_name = "jingdong.category.read.findAttrsByCategoryIdUnlimitCate" token = "74ee3f2fa54d489da0d1b94ca83c9bc7lztn" app_key = "CCE6563827D13EDF4769C01E7429378D" app_secret = "98c72374c5b045529879786fc4e5c20e" param = { "cid": class_id, "field": "attrValueList" } """ 预先把分类下面的属性查出来 """ check_sql = "select id, class_id, attr_id, attr_name, unit, enum_value, input_type, is_required from lie_shop_attr where platform = 3 and status = 1 and class_id = %s" cursor.execute(query=check_sql, args=(class_id,)) check_result = cursor.fetchall() api_attr_id_set = set() api_attr_name_set = set() ans = request_to_jd_pop(method_name, token, app_key, app_secret, param) result = ans["jingdong_category_read_findAttrsByCategoryIdUnlimitCate_responce"]["findattrsbycategoryidunlimitcate_result"] for j in range(len(result)): input_type = result[j]["inputType"] attr_id = result[j]["id"] attr_name = result[j]["name"] enum_value = "" flag = result[j]["isRequired"] status = 1 """ 将attr_id 和 attr_name加入集合 """ api_attr_id_set.add(attr_id) api_attr_name_set.add(attr_name) platform = 3 create_time = int(time.time()) update_time = int(time.time()) unit = "" if flag: is_required = 1 else: is_required = 0 if input_type == 1 or input_type == 2: value_list = result[j]["attrValueList"] tmp_list = [] for k in range(len(value_list)): attr_value_id = value_list[k]["id"] attr_value_name = value_list[k]["name"] item = { "attr_value_id": attr_value_id, "attr_value_name": attr_value_name } tmp_list.append(item) enum_value = json.dumps(tmp_list, ensure_ascii=False) else: enum_value = "[]" if input_type == 10: # 单独处理单位 features = result[j]["features"] for k in range(len(features)): item = features[k] if item["key"] == "combinationValue": unit_str = item["fvalue"] unit_arr = json.loads(unit_str) unit_item = unit_arr[0] unit_list = unit_item["unit"] unit = json.dumps(unit_list, ensure_ascii=False) """ 检查属性需不需要更新等等。。。 """ exists_attr = False now_attr_map = { "attr_id": attr_id, "attr_name": attr_name, "unit": unit, "enum_value": enum_value, "input_type": input_type, "is_required": is_required, } for z in range(len(check_result)): primary_id = check_result[z][0] origin_class_id = check_result[z][1] origin_attr_id = check_result[z][2] origin_attr_name = check_result[z][3] origin_unit = check_result[z][4] origin_enum_value = check_result[z][5] origin_input_type = check_result[z][6] origin_is_required = check_result[z][7] """ 原来数据库中的属性map """ origin_attr_map = { "attr_id": origin_attr_id, "attr_name": origin_attr_name, "unit": origin_unit, "enum_value": origin_enum_value, "input_type": origin_input_type, "is_required": origin_is_required, } flag1 = (origin_attr_id == attr_id) and (origin_attr_name == attr_name) flag2 = (origin_attr_id == attr_id) and (origin_attr_name != attr_name) flag3 = (origin_attr_id != attr_id) and (origin_attr_name == attr_name) if flag1 or flag2 or flag3: """ 检查其他项是否需要更新,有一个属性不一样就要执行update """ exists_attr = True for key in origin_attr_map: if origin_attr_map[key] != now_attr_map[key]: update_sql1 = "update lie_shop_attr set attr_id = %s, attr_name = %s, unit = %s, enum_value = %s, input_type = %s, is_required = %s, update_time = %s , is_mapping = 0 where id = %s" cursor.execute(query=update_sql1, args=(attr_id, attr_name, unit, enum_value, input_type, is_required, update_time, primary_id)) db.commit() print(update_sql1 % (attr_id, attr_name, unit, enum_value, input_type, is_required, update_time, primary_id)) break break else: continue if not exists_attr: insert_sql = "insert into lie_shop_attr (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" cursor.execute(query=insert_sql, args=(class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time,)) db.commit() print(insert_sql % (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time)) """ 将api返回中没有的属性设为禁用 """ for z in range(len(check_result)): id = check_result[z][0] origin_attr_id = check_result[z][2] origin_attr_name = check_result[z][3] if origin_attr_id not in api_attr_id_set and origin_attr_name not in api_attr_name_set: delete_sql = "update lie_shop_attr set status = 0 where id = %s" cursor.execute(query=delete_sql, args=(id,)) db.commit() print(delete_sql % id) def handle_vc_unit(conf: dict): """ 用于处理默认单位字段。 default_unit """ host = conf["mysql"]["host"] port = conf["mysql"]["port"] user = conf["mysql"]["user"] password = conf["mysql"]["password"] database = conf["mysql"]["database"] db = pymysql.connect(host=host, port=port, user=user, password=password, database=database) cursor = db.cursor() select_sql = "select id, unit, input_type from lie_shop_attr where platform = 3 and unit != ''" cursor.execute(query=select_sql) db_result = cursor.fetchall() for i in range(len(db_result)): id = db_result[i][0] unit = db_result[i][1] input_type = db_result[i][2] if input_type != 10: update_sql = "update lie_shop_attr set default_unit = %s where id = %s" cursor.execute(query=update_sql, args=(unit, id,)) db.commit() print(update_sql % (unit, id)) else: """ input_type 为10的单位单独处理。 """ end = str(unit).find(",") default_unit = unit[2:end-1] update_sql = "update lie_shop_attr set default_unit = %s where id = %s" cursor.execute(query=update_sql, args=(default_unit, id,)) db.commit() print(update_sql % (default_unit, id)) def handle_default_unit(conf: dict): """ 保持unit与default_unit一致。 """ host = conf["mysql"]["host"] port = conf["mysql"]["port"] user = conf["mysql"]["user"] password = conf["mysql"]["password"] database = conf["mysql"]["database"] db = pymysql.connect(host=host, port=port, user=user, password=password, database=database) cursor = db.cursor() select_sql = "select id, unit from lie_shop_attr where platform = 3 and unit = '' and default_unit != ''" cursor.execute(query=select_sql) db_result = cursor.fetchall() for i in range(len(db_result)): id = db_result[i][0] update_sql = "update lie_shop_attr set default_unit = '' where id = %s" cursor.execute(query=update_sql, args=(id,)) db.commit() print(update_sql % (id)) def handle_unit_from_attr_name(conf: dict): """ pop的单位很多是从 属性名中提取的。 """ host = conf["mysql"]["host"] port = conf["mysql"]["port"] user = conf["mysql"]["user"] password = conf["mysql"]["password"] database = conf["mysql"]["database"] db = pymysql.connect(host=host, port=port, user=user, password=password, database=database) cursor = db.cursor() select_sql = "select id, attr_name from lie_shop_attr where platform = 3 and input_type != 10 and unit = ''" cursor.execute(query=select_sql) db_result = cursor.fetchall() for i in range(len(db_result)): id = db_result[i][0] attr_name = db_result[i][1] start = attr_name.find("(") end = attr_name.find(")") if start == -1 or end == -1 or start >= end or start == 0: print(attr_name + " can't match!!") continue else: unit_str = attr_name[start + 1:end] update_sql = "update lie_shop_attr set unit = %s, default_unit = %s where id = %s" cursor.execute(query=update_sql, args=(unit_str, unit_str, id,)) db.commit() print(update_sql % (unit_str, unit_str, id)) if __name__ == '__main__': option = {"dev", "prod"} if len(sys.argv) >= 2 and sys.argv[1] in option: environment = sys.argv[1] else: print("命令选项:\n线上:python3 crontab_pop.py prod\n测试:python3 crontab_pop.py dev\n") sys.exit(1) config = config_dev if environment == "dev": config = config_dev elif environment == "prod": config = config_prod update_pop_class(config) print("======================> pop分类已更新") update_pop_attr(config) print("======================> pop参数已更新") handle_vc_unit(config) handle_default_unit(config) handle_unit_from_attr_name(config) print("======================> pop单位处理完毕")