Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
岳巧源
/
my-awesome-project
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
bcd55da6
authored
Jun 26, 2024
by
岳巧源
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
split to 3 scripts
parent
5be99361
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
392 additions
and
0 deletions
release_crm.py
release_liexin.py
release_pur.py
release_crm.py
0 → 100644
View file @
bcd55da6
import
json
import
mysql.connector
import
requests
import
logging
# 线上地址需要更改
dev_url
=
"http://united_data.liexindev.net/sync/Address/updateAddress"
#测试环境url
prod_url
=
""
#生产环境
#日志配置
logging
.
basicConfig
(
format
=
'
%(asctime)
s -
%(filename)
s[line:
%(lineno)
d] -
%(levelname)
s:
%(message)
s'
,
level
=
logging
.
INFO
,
filename
=
'data_crm.log'
,
filemode
=
'a'
)
def
get_user_detail_address_by_code
(
province
:
int
,
city
:
int
,
district
:
int
,
detail_address
:
str
)
->
str
:
db
=
mysql
.
connector
.
connect
(
host
=
"232.db.liexindev.me"
,
user
=
"liexin"
,
password
=
"liexin#zsyM"
,
database
=
"liexin"
)
sql1
=
"select region_name from lie_region where region_id = "
+
str
(
province
)
sql2
=
"select region_name from lie_region where region_id = "
+
str
(
city
)
sql3
=
"select region_name from lie_region where region_id = "
+
str
(
district
)
user_detail_address
=
""
cursor
=
db
.
cursor
()
# 省
cursor
.
execute
(
sql1
)
result1
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result1
)):
if
len
(
result1
[
i
])
!=
0
:
user_detail_address
+=
result1
[
i
][
0
]
# 市
cursor
.
execute
(
sql2
)
result2
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result2
)):
if
len
(
result2
[
i
])
!=
0
:
user_detail_address
+=
result2
[
i
][
0
]
# 区
cursor
.
execute
(
sql3
)
result3
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result3
)):
if
len
(
result3
[
i
])
!=
0
:
user_detail_address
+=
result3
[
i
][
0
]
user_detail_address
+=
detail_address
cursor
.
close
()
db
.
cursor
()
return
user_detail_address
#########################################################################
def
crm
():
db
=
mysql
.
connector
.
connect
(
host
=
"master.db2.liexindev.me"
,
user
=
"liexin_crmv2"
,
password
=
"liexin_crmv2#zsyM"
,
database
=
"liexin_crmv2"
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
"select com_name, business_license_src, creator_id, creator_name, id, com_addr from lie_company where com_category in (0, 1)"
)
result
=
cursor
.
fetchall
()
logging
.
info
(
"---------start crm-----------"
)
for
row
in
result
:
com_name
=
row
[
0
]
business_license_src
=
row
[
1
]
creator_id
=
row
[
2
]
creator_name
=
row
[
3
]
id
=
row
[
4
]
com_addr
=
row
[
5
]
json_map
=
{
"company_name_cn"
:
com_name
,
# com_name
"business_license"
:
business_license_src
,
# business_license_src
"source_system_id"
:
"3"
,
# 1供应商 2供应链 3crm 4一体化 5采购系统 6-订单系统
"create_uid"
:
creator_id
,
# creator_id
"create_name"
:
creator_name
,
# creator_name
"address_data"
:
[
{
"source_id"
:
"company_"
+
str
(
id
),
# id
"source_code"
:
""
,
"address_type"
:
"公司地址"
,
"address"
:
com_addr
# com_addr
}
],
"is_delete"
:
0
}
if
len
(
com_addr
)
==
0
:
json_map
[
"address_data"
]
=
[]
print
(
json_map
)
post_data_to_update_address
(
json_map
)
print
(
"-------------------------crm已更新完毕!--------------------------------"
)
logging
.
info
(
"-------------------------crm已更新完毕!--------------------------------"
)
cursor
.
close
()
db
.
close
()
def
post_data_to_update_address
(
data
:
dict
):
data_str
=
json
.
dumps
(
data
)
logging
.
info
(
data
)
response
=
requests
.
post
(
url
=
dev_url
,
data
=
data_str
,
headers
=
{
"Content-Type"
:
"application/json"
})
print
(
"======="
+
str
(
response
.
status_code
)
+
"======"
)
logging
.
info
(
str
(
response
.
status_code
))
if
__name__
==
'__main__'
:
crm
()
release_liexin.py
0 → 100644
View file @
bcd55da6
import
json
import
mysql.connector
import
requests
import
logging
# 线上地址需要更改
dev_url
=
"http://united_data.liexindev.net/sync/Address/updateAddress"
#测试环境url
prod_url
=
""
#生产环境
#日志配置
logging
.
basicConfig
(
format
=
'
%(asctime)
s -
%(filename)
s[line:
%(lineno)
d] -
%(levelname)
s:
%(message)
s'
,
level
=
logging
.
INFO
,
filename
=
'data_liexin.log'
,
filemode
=
'a'
)
def
get_user_detail_address_by_code
(
province
:
int
,
city
:
int
,
district
:
int
,
detail_address
:
str
)
->
str
:
db
=
mysql
.
connector
.
connect
(
host
=
"232.db.liexindev.me"
,
user
=
"liexin"
,
password
=
"liexin#zsyM"
,
database
=
"liexin"
)
sql1
=
"select region_name from lie_region where region_id = "
+
str
(
province
)
sql2
=
"select region_name from lie_region where region_id = "
+
str
(
city
)
sql3
=
"select region_name from lie_region where region_id = "
+
str
(
district
)
user_detail_address
=
""
cursor
=
db
.
cursor
()
# 省
cursor
.
execute
(
sql1
)
result1
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result1
)):
if
len
(
result1
[
i
])
!=
0
:
user_detail_address
+=
result1
[
i
][
0
]
# 市
cursor
.
execute
(
sql2
)
result2
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result2
)):
if
len
(
result2
[
i
])
!=
0
:
user_detail_address
+=
result2
[
i
][
0
]
# 区
cursor
.
execute
(
sql3
)
result3
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result3
)):
if
len
(
result3
[
i
])
!=
0
:
user_detail_address
+=
result3
[
i
][
0
]
user_detail_address
+=
detail_address
cursor
.
close
()
db
.
cursor
()
return
user_detail_address
######################################################
def
liexin
():
db
=
mysql
.
connector
.
connect
(
host
=
"232.db.liexindev.me"
,
user
=
"liexin"
,
password
=
"liexin#zsyM"
,
database
=
"liexin"
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
"select tax_title, business_license, tax_id, user_sn, company_address, consignee_province, consignee_city, consignee_district, consignee_address, uc_id from lie_taxinfo"
)
result
=
cursor
.
fetchall
()
logging
.
info
(
"start liexin"
)
for
row
in
result
:
tax_title
=
row
[
0
]
business_license
=
row
[
1
]
tax_id
=
row
[
2
]
user_sn
=
row
[
3
]
company_address
=
row
[
4
]
consignee_province
=
row
[
5
]
consignee_city
=
row
[
6
]
consignee_district
=
row
[
7
]
consignee_address
=
row
[
8
]
uc_id
=
row
[
9
]
# 获取收票地址
tax_detail_address
=
get_user_detail_address_by_code
(
consignee_province
,
consignee_city
,
consignee_district
,
consignee_address
)
# 获取客户地址
sql_user_address
=
"select province, city, district, detail_address, address_type from lie_user_address where uc_id = "
+
str
(
uc_id
)
cursor
.
execute
(
sql_user_address
)
user_address_result
=
cursor
.
fetchall
()
user_detail_address_list
=
[]
for
i
in
range
(
len
(
user_address_result
)):
province
=
user_address_result
[
i
][
0
]
city
=
user_address_result
[
i
][
1
]
district
=
user_address_result
[
i
][
2
]
detail_address
=
user_address_result
[
i
][
3
]
address_type
=
user_address_result
[
i
][
4
]
user_address
=
get_user_detail_address_by_code
(
province
,
city
,
district
,
detail_address
)
user_detail_address_list
.
append
(
user_address
)
json_map
=
{
"company_name_cn"
:
tax_title
,
# tax_title
"business_license"
:
business_license
,
# business_license
"source_system_id"
:
"3"
,
# ????? # 1供应商 2供应链 3crm 4一体化 5采购系统 6-订单系统
"create_uid"
:
"1000"
,
#
"create_name"
:
"admin"
,
#
"address_data"
:
[],
"is_delete"
:
0
}
# 发票地址不为空,加入发票地址信息
if
len
(
company_address
)
!=
0
:
json_map
[
"address_data"
]
.
append
({
"source_id"
:
"tax_id_"
+
str
(
tax_id
),
"source_code"
:
user_sn
,
"address_type"
:
"发票地址"
,
"address"
:
company_address
})
# 收票地址不为空,加入收票地址信息
if
len
(
tax_detail_address
)
!=
0
:
json_map
[
"address_data"
]
.
append
({
"source_id"
:
"tax_invoice_collection_address_"
+
str
(
tax_id
),
"source_code"
:
user_sn
,
"address_type"
:
"收票地址"
,
"address"
:
tax_detail_address
})
# 客户地址信息列表不为空 加入客户地址信息
for
i
in
range
(
len
(
user_detail_address_list
)):
if
len
(
user_detail_address_list
[
i
])
==
0
:
continue
address_type_str
=
"收票地址"
if
address_type
==
0
:
address_type_str
=
"收货地址"
json_map
[
"address_data"
]
.
append
({
"source_id"
:
"crm_invoice_user_"
+
str
(
tax_id
),
"source_code"
:
user_sn
,
"address_type"
:
address_type_str
,
"address"
:
user_detail_address_list
[
i
]
})
print
(
json_map
)
post_data_to_update_address
(
json_map
)
print
(
"-------------------------liexin已更新!--------------------------------"
)
logging
.
info
(
"-------------------------liexin已更新!--------------------------------"
)
cursor
.
close
()
db
.
close
()
def
post_data_to_update_address
(
data
:
dict
):
data_str
=
json
.
dumps
(
data
)
logging
.
info
(
data
)
response
=
requests
.
post
(
url
=
dev_url
,
data
=
data_str
,
headers
=
{
"Content-Type"
:
"application/json"
})
print
(
"======="
+
str
(
response
.
status_code
)
+
"======"
)
logging
.
info
(
str
(
response
.
status_code
))
if
__name__
==
'__main__'
:
liexin
()
release_pur.py
0 → 100644
View file @
bcd55da6
import
json
import
mysql.connector
import
requests
import
logging
# 线上地址需要更改
dev_url
=
"http://united_data.liexindev.net/sync/Address/updateAddress"
#测试环境url
prod_url
=
""
#生产环境
#日志配置
logging
.
basicConfig
(
format
=
'
%(asctime)
s -
%(filename)
s[line:
%(lineno)
d] -
%(levelname)
s:
%(message)
s'
,
level
=
logging
.
INFO
,
filename
=
'data_pur.log'
,
filemode
=
'a'
)
def
get_user_detail_address_by_code
(
province
:
int
,
city
:
int
,
district
:
int
,
detail_address
:
str
)
->
str
:
db
=
mysql
.
connector
.
connect
(
host
=
"232.db.liexindev.me"
,
user
=
"liexin"
,
password
=
"liexin#zsyM"
,
database
=
"liexin"
)
sql1
=
"select region_name from lie_region where region_id = "
+
str
(
province
)
sql2
=
"select region_name from lie_region where region_id = "
+
str
(
city
)
sql3
=
"select region_name from lie_region where region_id = "
+
str
(
district
)
user_detail_address
=
""
cursor
=
db
.
cursor
()
# 省
cursor
.
execute
(
sql1
)
result1
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result1
)):
if
len
(
result1
[
i
])
!=
0
:
user_detail_address
+=
result1
[
i
][
0
]
# 市
cursor
.
execute
(
sql2
)
result2
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result2
)):
if
len
(
result2
[
i
])
!=
0
:
user_detail_address
+=
result2
[
i
][
0
]
# 区
cursor
.
execute
(
sql3
)
result3
=
cursor
.
fetchall
()
for
i
in
range
(
len
(
result3
)):
if
len
(
result3
[
i
])
!=
0
:
user_detail_address
+=
result3
[
i
][
0
]
user_detail_address
+=
detail_address
cursor
.
close
()
db
.
cursor
()
return
user_detail_address
def
pur
():
db
=
mysql
.
connector
.
connect
(
host
=
"master.db2.liexindev.me"
,
user
=
"liexin_purchase"
,
password
=
"liexin_purchase#zsyM"
,
database
=
"liexin_purchase"
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
"select end_cust_cn, end_cust_en, create_uid, create_name, end_cust_id, com_addr from lie_end_cust_info"
)
result
=
cursor
.
fetchall
()
logging
.
info
(
"start pur"
)
for
row
in
result
:
end_cust_cn
=
row
[
0
]
end_cust_en
=
row
[
1
]
create_uid
=
row
[
2
]
create_name
=
row
[
3
]
end_cust_id
=
row
[
4
]
com_addr
=
row
[
5
]
json_map_cn
=
{
"company_name_cn"
:
end_cust_cn
,
"business_license"
:
""
,
"source_system_id"
:
"5"
,
# ???? 来源系统1供应商 2供应链 3crm 4一体化 5采购系统 6-订单系统
"create_uid"
:
create_uid
,
# ok
"create_name"
:
create_name
,
# ok
"address_data"
:
[{
"source_id"
:
"end_cust_id_"
+
str
(
end_cust_id
),
#也是主键id
"source_code"
:
""
,
"address_type"
:
"终端地址"
,
"address"
:
com_addr
}],
"is_delete"
:
0
}
json_map_en
=
{
"company_name_cn"
:
end_cust_en
,
"business_license"
:
""
,
"source_system_id"
:
"5"
,
# ???? 来源系统1供应商 2供应链 3crm 4一体化 5采购系统 6-订单系统
"create_uid"
:
create_uid
,
# ok
"create_name"
:
create_name
,
# ok
"address_data"
:
[{
"source_id"
:
"end_cust_id_"
+
str
(
end_cust_id
),
#也是主键id
"source_code"
:
""
,
"address_type"
:
"终端地址"
,
"address"
:
com_addr
}],
"is_delete"
:
0
}
print
(
json_map_cn
)
print
(
json_map_en
)
post_data_to_update_address
(
json_map_cn
)
post_data_to_update_address
(
json_map_en
)
print
(
"-------------------------pur已更新!--------------------------------"
)
logging
.
info
(
"-------------------------pur已更新!--------------------------------"
)
cursor
.
close
()
db
.
close
()
def
post_data_to_update_address
(
data
:
dict
):
data_str
=
json
.
dumps
(
data
)
logging
.
info
(
data
)
response
=
requests
.
post
(
url
=
dev_url
,
data
=
data_str
,
headers
=
{
"Content-Type"
:
"application/json"
})
print
(
"======="
+
str
(
response
.
status_code
)
+
"======"
)
logging
.
info
(
str
(
response
.
status_code
))
if
__name__
==
'__main__'
:
pur
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment