Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
lichenggang
/
tas_token_server
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
34c23673
authored
Mar 19, 2020
by
lichenggang
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
添加digikey_api的token对接
parent
73f224f8
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
86 additions
and
0 deletions
lib/apidigikey.py
lib/apidigikey.py
0 → 100644
View file @
34c23673
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import
traceback
import
redis
import
requests
import
token_lib
PLATFORM
=
"apidigikey"
LEVEL
=
"HH"
BACK_NUM
=
0
MODE
=
"redis"
MAIN_NUM
=
THREAD_NUM
=
1
p_headers
=
{
'Host'
:
'api.digikey.com'
,
'Content-Type'
:
'application/x-www-form-urlencoded'
}
p_url
=
"https://api.digikey.com/v1/oauth2/token"
redirect_uri
=
'https://bigdata.ichunt.com/digikey_auth.php'
# client_id = '9zwA2tsTFpKURpMIj5VXGK3eoPWLIGgh'
# client_secret = 'CxRLozO8hZ2GvCJ1'
# 对接digikey的api的token不放在master而放在这台机器上
apidigikey_ip
=
"172.18.137.38"
psd
=
'icDb29mLy2s'
class
DGTokenCrawler
(
token_lib
.
BaseTokenCrawler
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
()
.
__init__
(
*
args
,
**
kwargs
)
self
.
token_key
=
"digikey_access_token"
self
.
refresh_token_key
=
"digikey_refresh_token"
self
.
client_id
=
'9zwA2tsTFpKURpMIj5VXGK3eoPWLIGgh'
self
.
client_secret
=
'CxRLozO8hZ2GvCJ1'
self
.
redirect_uri
=
'https://bigdata.ichunt.com/digikey_auth.php'
self
.
_pool
=
redis
.
ConnectionPool
(
host
=
apidigikey_ip
,
port
=
6379
,
db
=
0
,
password
=
psd
)
self
.
_redis
=
redis
.
Redis
(
connection_pool
=
self
.
_pool
)
def
get_params_list
(
self
):
return
[[
i
]
for
i
in
range
(
THREAD_NUM
)]
def
collect_token
(
self
):
try
:
access_token
,
refresh_token
=
self
.
get_token_by_refresh_token
()
return
access_token
,
refresh_token
except
Exception
:
self
.
logger
.
error
(
traceback
.
format_exc
())
def
get_token
(
self
):
# url="https://api.digikey.com/v1/oauth2/authorize?response_type=code&client_id=9zwA2tsTFpKURpMIj5VXGK3eoPWLIGgh&redirect_uri=https://bigdata.ichunt.com/digikey_auth.php"
# url="https://sandbox-api.digikey.com/v1/oauth2/authorize?client_id=Qe3ffZGQ9E5fTsOFiz4Cv69IzlFGpzvj&response_type=code&redirect_uri=https://bigdata.ichunt.com/digikey_auth_sandbox.php"
p_data
=
{
'code'
:
'XgjehVDz'
,
'client_id'
:
self
.
client_id
,
'client_secret'
:
self
.
client_secret
,
'redirect_uri'
:
self
.
redirect_uri
,
'grant_type'
:
'authorization_code'
}
# sess.max_redirects=0
resp
=
requests
.
post
(
p_url
,
headers
=
p_headers
,
data
=
p_data
)
token_json
=
resp
.
json
()
return
token_json
[
'access_token'
],
token_json
[
'refresh_token'
]
def
get_token_by_refresh_token
(
self
):
refresh_token
=
self
.
_redis
.
get
(
self
.
refresh_token_key
)
.
decode
()
p_data
=
{
'client_id'
:
self
.
client_id
,
'client_secret'
:
self
.
client_secret
,
'refresh_token'
:
refresh_token
,
# 'refresh_token': 'jwki255qAu14VPDGhuJKQVPHixiAGO1D',
'grant_type'
:
'refresh_token'
}
resp
=
requests
.
post
(
p_url
,
headers
=
p_headers
,
data
=
p_data
)
token_json
=
resp
.
json
()
return
token_json
[
'access_token'
],
token_json
[
'refresh_token'
]
def
process
(
self
,
is_main
):
access_token
,
refresh_token
=
self
.
collect_token
()
print
(
access_token
)
print
(
refresh_token
)
self
.
_redis
.
set
(
self
.
token_key
,
access_token
)
self
.
_redis
.
set
(
self
.
refresh_token_key
,
refresh_token
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment