Commit ac071c7b by lichenggang

'update task'

parent 8af9e609
Showing with 27 additions and 5 deletions
......@@ -138,7 +138,8 @@ def parse_text1(text):
return obj
def schedule(task):
pass
def get_url():
pool = redis.ConnectionPool(**get_redis_conf())
......@@ -153,6 +154,19 @@ def get_url():
else:
break
class Task(object):
def __init__(self):
pool = redis.ConnectionPool(**get_redis_conf())
self.rcon = redis.Redis(connection_pool=pool)
self.queue = 'csc_elec_chip1stop_1'
def listen_task(self):
while True:
bin_task = json.loads(self.rcon.blpop(self.queue, 0)[1].decode())
tup_task = (bin_task[0],bin_task[1])
flag=schedule(tup_task)
if __name__ == '__main__':
......
......@@ -35,6 +35,7 @@ class Base:
class Producer(Base):
tries=1
def __init__(self, data_queue=None, durable=True, exchange=None, exchange_type=None):
config = get_mq_conf()
super().__init__(config['user'], config['password'],
......@@ -46,10 +47,17 @@ class Producer(Base):
self.ch.queue_declare(queue=self.data_queue, durable=self.durable)
def send_task(self, body):
self.ch.basic_publish(exchange='', routing_key=self.data_queue,
properties=pika.BasicProperties(delivery_mode=2), # make message persistent
body=body)
try:
self.ch.basic_publish(exchange='', routing_key=self.data_queue,
properties=pika.BasicProperties(delivery_mode=2), # make message persistent
body=body)
except Exception as e:
if self.tries<=3:
self.__init__(self,self.data_queue,self.durable,self.exchange)
self.send_task(body)
self.tries+=1
else:
raise e
def re_conn_channel(self):
self.ch.close()
self.ch = self.conn.channel()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment