Commit 852ee867 by lichenggang

'listen'

parent 9bc7ba58
Showing with 31 additions and 16 deletions
......@@ -21,15 +21,19 @@ partid_pattern = re.compile('partId=(.*)')
pid_driver_map = dict()
logger = Mylogger('error.log').logger
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36") #设置user-agent请求头
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36") # 设置user-agent请求头
dcap["phantomjs.page.settings.loadImages"] = False
def gen_table_doms(url, cat_id):
pid = os.getpid()
if pid in pid_driver_map:
driver = pid_driver_map.get(pid)
else:
if ENV == 'test':
driver = webdriver.Chrome(options=chrome_options, executable_path='C:/Users/ICHUNT/Desktop/chromedriver.exe')
driver = webdriver.Chrome(options=chrome_options,
executable_path='C:/Users/ICHUNT/Desktop/chromedriver.exe')
else:
driver = webdriver.PhantomJS(desired_capabilities=dcap)
pid_driver_map[pid] = driver
......@@ -61,9 +65,11 @@ def gen_table_doms(url, cat_id):
good['cat_id'] = cat_id
producer.send_task(json.dumps(good))
print(good)
return 1
except Exception:
err = traceback.format_exc()
logger.error(err[err.rfind('\n', 0, -2):].strip() + ' - page: ' + str(page) + ' - ' + 'url: ' + url)
return 0
def parse_text(text):
......@@ -88,7 +94,8 @@ def parse_text(text):
obj['from'] = fm[0].text.strip() if fm else ''
stock = tr.xpath('.//p[@class="m-font-b"][2]')
obj['goods_number'] = int(stock[0].text.replace('库存数:', '').replace(',', '')) if stock else 0
obj['min_buynum'] = int(tr.xpath('.//td[3]//input[2]/@value')[0]) if tr.xpath('.//td[3]//input[2]/@value') else 1
obj['min_buynum'] = int(tr.xpath('.//td[3]//input[2]/@value')[0]) if tr.xpath(
'.//td[3]//input[2]/@value') else 1
obj['increment'] = int(tr.xpath('.//td[3]//input[3]/@value')[0]) if tr.xpath('.//td[3]//input[3]/@value') else 1
price_doms = tr.xpath('.//td[4]//tbody//tr')
obj['prices'] = []
......@@ -141,6 +148,7 @@ def parse_text1(text):
def schedule(task):
pass
def get_url():
pool = redis.ConnectionPool(**get_redis_conf())
......@@ -154,6 +162,7 @@ def get_url():
else:
break
class Task(object):
def __init__(self):
pool = redis.ConnectionPool(**get_redis_conf())
......@@ -164,19 +173,21 @@ class Task(object):
def listen_task(self):
while True:
bin_task = json.loads(self.rcon.blpop(self.queue, 0)[1].decode())
tup_task = (bin_task[0],bin_task[1])
flag=schedule(tup_task)
tup_task = (bin_task['req_key'], bin_task['extra']['cat_id'])
flag = gen_table_doms(*tup_task)
if not flag:
break
if __name__ == '__main__':
Task().listen_task()
# p = Pool(3)
for req in get_url():
# for req in [('https://www.chip1stop.com/HKG/zh/view/searchResult/SearchResultTop?classCd=081015&classLv=3&subWinSearchFlg=false&searchType=2&dispAllFlg=true&searchFlg=false&direct_Flg=true',0)]:
url = req[0]
cat_id = req[1]
# p.apply_async(gen_table_doms, args=(url, cat_id))
gen_table_doms(url,cat_id)
# for req in get_url():
# # for req in [('https://www.chip1stop.com/HKG/zh/view/searchResult/SearchResultTop?classCd=081015&classLv=3&subWinSearchFlg=false&searchType=2&dispAllFlg=true&searchFlg=false&direct_Flg=true',0)]:
# url = req[0]
# cat_id = req[1]
# # p.apply_async(gen_table_doms, args=(url, cat_id))
# gen_table_doms(url,cat_id)
# p.close()
# p.join()
......@@ -35,7 +35,8 @@ class Base:
class Producer(Base):
tries=1
tries = 1
def __init__(self, data_queue=None, durable=True, exchange=None, exchange_type=None):
config = get_mq_conf()
super().__init__(config['user'], config['password'],
......@@ -52,14 +53,17 @@ class Producer(Base):
properties=pika.BasicProperties(delivery_mode=2), # make message persistent
body=body)
except Exception as e:
if self.tries<=3:
self.__init__(self,self.data_queue,self.durable,self.exchange)
if self.tries <= 10:
self.__init__(self, self.data_queue, self.durable, self.exchange)
self.send_task(body)
self.tries+=1
self.tries += 1
else:
raise e
def re_conn_channel(self):
self.ch.close()
self.ch = self.conn.channel()
self.ch.queue_declare(queue=self.data_queue, durable=self.durable)
producer = Producer('chip1stop_new_goods_store')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment