# 主要用于更新db中对应分类下的参数属性 # 分类数据来源于lie_shop_class 数据库中class_id 字段或 parent_id 字段 import json import sys import time import pymysql from common import request_to_jd_vc import redis # TODO 通知基石平台告警 ??? 是不是按照原来的方式?? 用is_mapping 字段来进行标识??? config_dev = { "mysql": { "host": "192.168.1.238", "port": 3306, "user": "liexin_data_distribution", "password": "liexin_data_distribution#zsyM", "database": "liexin_data_distribution" }, "redis": { "host": "192.168.1.235", "port": 6379, "password": "icDb29mLy2s" } } config_prod = { "mysql": { "host": "bigdata2.ichunt.db", "port": 3306, "user": "DsbuUx", "password": "sfj09JjsfpQx0", "database": "liexin_data_distribution" }, "redis": { "host": "172.18.137.38", "port": 6379, "password": "icDb29mLy2s" } } def update_jd_attr_by_class(conf: dict): """ 基于lie_shop_attr中的class_id来更新属性 缺少的属性直接添加,改变的属性则进行update """ host = conf["mysql"]["host"] port = conf["mysql"]["port"] user = conf["mysql"]["user"] password = conf["mysql"]["password"] database = conf["mysql"]["database"] db = pymysql.connect(host=host, port=port, user=user, password=password, database=database) select_sql = "select distinct class_id from lie_shop_attr" cursor = db.cursor() cursor.execute(query=select_sql) results = cursor.fetchall() class_ids = [] for i in range(len(results)): class_ids.append(results[i][0]) # 请求京东平台 获取分类下的属性(vc接口) method_name = "jingdong.vc.item.props.find" token = "845ce8478b074103b9e78a769d5fa4831y2u" app_key = "CA52430E90209F51D8F5D7B615DDE9AD" app_secret = "c92691b2379c48de87e699c4c2f7fb32" for i in range(len(class_ids)): class_id = class_ids[i] param = { "category_leaf_id": class_id } ans = request_to_jd_vc(method_name=method_name, token=token, app_key=app_key, app_secret=app_secret, param=param) # 处理京东返回的数据 并尝试修改db if "result" not in ans["jingdong_vc_item_props_find_responce"]["jos_result_dto"]: continue jd_results = ans["jingdong_vc_item_props_find_responce"]["jos_result_dto"]["result"] for j in range(len(jd_results)): result = jd_results[j] props = result["props"] for k in range(len(props)): prop = props[k] # 对于 required 为false的数据, 直接跳过,不予处理 if not prop["required"]: continue # 对于 input_type 不同的数据 需要采用不同的处理方式。 # 检查这个属性id 是否已经存在(存在的情况会update, 不存在的情况会insert) attr_id = prop["id"] check_sql = "select id from lie_shop_attr where class_id = %s and attr_id = %s" cursor.execute(query=check_sql, args=(class_id, attr_id)) db_result = cursor.fetchall() attr_name = prop["name"] unit = prop["val_unit"] input_type = prop["input_type"] platform = 1 status = 1 is_required = 1 enum_value = "[]" if input_type == 1 or input_type == 2: # 单选或者多选 把选项取出来 tmp_value_list = [] values = prop["values"] for index in range(len(values)): attr_value_id = values[index]["id"] attr_value_name = values[index]["name"] item_map = { "attr_value_id": attr_value_id, "attr_value_name": attr_value_name, } tmp_value_list.append(item_map) enum_value = json.dumps(tmp_value_list, ensure_ascii=False) elif input_type == 3: # 文本类型 # 文本类型没有可选值 pass elif input_type == 7: # 数值类型 # 数值类型没有可选值 pass elif input_type == 10: # 多单位多解析的方式 # 多单位多解析 需要特殊处理单位字段 tmp_unit_list = [] values = prop["values"] for index in range(len(values)): if "units" in values[index]: unit_list = values[index]["units"] for x in range(len(unit_list)): tmp_unit_list.append(unit_list[x]) unit = json.dumps(tmp_unit_list, ensure_ascii=False) else: print("此input_type类型暂未兼容处理!请单独处理!") create_time = int(time.time()) # 如果db中暂时没有这条数据 就插入 if len(db_result) == 0: insert_sql = "insert into lie_shop_attr (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # 谨慎起见 先不插入 防止破坏数据库原数据 cursor.execute(query=insert_sql, args=(class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time, )) db.commit() print(insert_sql % (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time)) # 把sql语句输出到一个文件中 else: # TODO 已经存在,考虑更新, 鉴于更新的情况复杂,稍后处理 pass def update_ext_attr_by_class_ids(conf: dict): """ 基于lie_shop_attr中的class_id来更新 “拓展” 属性 """ host = conf["mysql"]["host"] port = conf["mysql"]["port"] user = conf["mysql"]["user"] password = conf["mysql"]["password"] database = conf["mysql"]["database"] db = pymysql.connect(host=host, port=port, user=user, password=password, database=database) select_sql = "select distinct class_id from lie_shop_attr" cursor = db.cursor() cursor.execute(query=select_sql) results = cursor.fetchall() class_ids = [] for i in range(len(results)): class_ids.append(results[i][0]) # 请求京东平台 获取分类下的 “拓展属性”(vc接口) method_name = "jingdong.vc.item.extProps.find" token = "845ce8478b074103b9e78a769d5fa4831y2u" app_key = "CA52430E90209F51D8F5D7B615DDE9AD" app_secret = "c92691b2379c48de87e699c4c2f7fb32" class_attr_mapping = {} for i in range(len(class_ids)): class_id = class_ids[i] class_attr_mapping[class_id] = [] param = { "category_leaf_id": class_id, } ans = request_to_jd_vc(method_name=method_name, token=token, app_key=app_key, app_secret=app_secret, param=param) print("request to jd ....") if "result" not in ans["jingdong_vc_item_extProps_find_responce"]["jos_result_dto"]: continue jd_results = ans["jingdong_vc_item_extProps_find_responce"]["jos_result_dto"]["result"] for j in range(len(jd_results)): result = jd_results[j] # 若为非必填的属性 直接跳过 flag = result["is_required"] if flag != 1: continue attr_id = result["att_id"] class_attr_mapping[class_id].append(attr_id) attr_name = result["name"] unit = result["val_unit"] is_required = 1 status = 1 platform = 1 create_time = int(time.time()) input_type = result["input_type"] enum_value = "[]" # 根据 input_type 不同类型分别处理 if input_type == 1 or input_type == 2: # 单选或者多选,获取选项值 ext_prop_value = result["ext_prop_value"] tmp_list_value = [] for k in range(len(ext_prop_value)): value = ext_prop_value[k] attr_value_id = value["value_id"] attr_value_name = value["value_name"] item = { "attr_value_id": attr_value_id, "attr_value_name": attr_value_name } tmp_list_value.append(item) enum_value = json.dumps(tmp_list_value, ensure_ascii=False) elif input_type == 3: # 文本类型,无选项值 pass else: print("暂时未兼容此种类型的拓展属性 input_type: " + str(input_type)) # 检查此分类的此拓展属性在 db 中是否已存在,未存在则 insert, 存在则 update check_sql = "select id from lie_shop_attr where class_id = %s and attr_id = %s" cursor.execute(query=check_sql, args=(class_id, attr_id,)) db_result = cursor.fetchall() if len(db_result) == 0: # 不存在时 insert_sql = "insert into lie_shop_attr (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" cursor.execute(query=insert_sql, args=(class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time,)) db.commit() print(insert_sql % (class_id, attr_id, attr_name, unit, enum_value, input_type, is_required, status, platform, create_time)) else: update_sql = "" insert_ext_attr_to_redis(class_attr_mapping, conf) def insert_ext_attr_to_redis(class_attr_map: dict, conf: dict): """ 拓展属性存到 redis中 key : jd_ext_attr field: 分类id value: 拓展属性id数组。 for example: jd_ext_attr 33811 [235914, 562124] class_attr_map 结构: { 33811: [235914, 562124], 33471: [1001074361, 236705] } """ host = conf["redis"]["host"] port = conf["redis"]["port"] password = conf["redis"]["password"] db = redis.Redis(host=host, port=port, password=password, decode_responses=True) for class_id in class_attr_map: key = str(class_id) attr_ids = class_attr_map[class_id] if len(attr_ids) == 0: # 为空的直接跳过 continue value = json.dumps(attr_ids, ensure_ascii=False) db.hset(name="jd_ext_attr", key=key, value=value) print("jd_ext_attr", key, value) if __name__ == '__main__': option = {"dev", "prod"} if len(sys.argv) >= 2 and sys.argv[1] in option: environment = sys.argv[1] else: sys.exit(1) config = config_dev if environment == "dev": config = config_dev elif environment == "prod": config = config_prod # update_jd_attr_by_class(conf=config) update_ext_attr_by_class_ids(conf=config)