电子文档交易市场
安卓APP | ios版本
电子文档交易市场
安卓APP | ios版本

Python使用Beanstalkd做异步任务处理的方法

5页
  • 卖家[上传人]:m****
  • 文档编号:45514645
  • 上传时间:2018-06-17
  • 文档格式:DOCX
  • 文档大小:19.77KB
  • / 5 举报 版权申诉 马上下载
  • 文本预览
  • 下载提示
  • 常见问题
    • 1、Python 使用 Beanstalkd 做异步任务处理的方法使用 Beanstalkd 作为消息队列服务,然后结合 Python 的装饰器语法实现一个简单的异步任务处理工具.最终效果定义任务:from xxxxx.job_queue import JobQueuequeue = JobQueue() queue.task(task_tube_one) def task_one(arg1, arg2, arg3): # do task提交任务:task_one.put(arg1=“a“, arg2=“b“, arg3=“c“)然后就可以由后台的 work 线程去执行这些任务了。实现过程1、了解 Beanstalk ServerBeanstalk is a simple, fast work queue. https:/ 是一个 C 语言实现的消息队列服务。 它提供了通用的接口,最初设计的目的是通过异步运行耗时的任务来减少大量 Web 应用程序中的页面延迟。针对不同的语言,有不同的 Beanstalkd Client 实现。 Python 里就有 beanstalkc 等。我就是利用 b

      2、eanstalkc 来作为与 beanstalkd server 通信的工具。2、任务异步执行实现原理beanstalkd 只能进行字符串的任务调度。为了让程序支持提交函数和参数,然后由 woker 执行函数并携带参数。需要一个中间层来将函数与传递的参数注册。实现主要包括 3 个部分:Subscriber: 负责将函数注册到 beanstalk 的一个 tube 上,实现很简单,注册函数名和函数本身的对应关系。(也就意味着同一个分组(tube)下不能有相同函数名存在)。数据存储在类变量里。class Subscriber(object): FUN_MAP = defaultdict(dict) def _init_(self, func, tube): logger.info(register func: to tube:.format(func._name_, tube)Subscriber.FUN_MAPtubefunc._name_ = funcJobQueue: 方便将一个普通函数转换为具有 Putter 能力的装饰器class JobQueue(object): classm

      3、ethod def task(cls, tube): def wrapper(func): Subscriber(func, tube) return Putter(func, tube) return wrapperPutter: 将函数名、函数参数、指定的分组组合为一个对象,然后 json 序列化为字符串,最后通过 beanstalkc 推送到 beanstalkd 队列。class Putter(object): def _init_(self, func, tube): self.func = funcself.tube = tube # 直接调用返回 def _call_(self, *args, *kwargs): return self.func(*args, *kwargs) # 推给离线队列 def put(self, *kwargs): args = func_name: self.func._name_, tube: self.tube, kwargs: kwargslogger.info(put job: to queue.format(args)beansta

      4、lk = beanstalkc.Connection(host=BEANSTALK_CONFIGhost, port=BEANSTALK_CONFIGport) try:beanstalk.use(self.tube)job_id = beanstalk.put(json.dumps(args) return job_id finally:beanstalk.close()Worker: 从 beanstalkd 队列中取出字符串,然后通过 json.loads 反序列化为对象,获得 函数名、参数和 tube。最后从 Subscriber 中获得 函数名对应的函数代码,然后传递参数执行函数。class Worker(object): worker_id = 0 def _init_(self, tubes): self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIGhost, port=BEANSTALK_CONFIGport)self.tubes = tubesself.reserve_timeout = 20 self

      5、.timeout_limit = 1000 self.kick_period = 600 self.signal_shutdown = False self.release_delay = 0 self.age = 0 self.signal_shutdown = False signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown()Worker.worker_id += 1 import_module_by_str(pear.web.controllers.controller_crawler) def subscribe(self): if isinstance(self.tubes, list): for tube in self.tubes: if tube not in Subscriber.FUN_MAP.keys():logger.error(tube: not register!.format(tube) continue self.beanstalk.watch(tube)

      6、else: if self.tubes not in Subscriber.FUN_MAP.keys():logger.error(tube: not register!.format(self.tubes) return self.beanstalk.watch(self.tubes) def run(self): self.subscribe() while True: if self.signal_shutdown: break if self.signal_shutdown:logger.info(“graceful shutdown“) break job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout if not job: continue try:self.on_job(job)self.delete_job(job) except beanstalkc.CommandFailed as e:logger.warning(e, exc_info=1) except

      7、Exception as e:logger.error(e)kicks = job.stats()kicks if kicks 3:self.bury_job(job) else:message = json.loads(job.body)logger.error(“Kicks reach max. Delete the job“, extra=body: message)self.delete_job(job) classmethod def on_job(cls, job): start = time.time()msg = json.loads(job.body)logger.info(msg)tube = msg.get(tube)func_name = msg.get(func_name) try:func = Subscriber.FUN_MAPtubefunc_namekwargs = msg.get(kwargs)func(*kwargs)logger.info(u-.format(func, kwargs) except Exception as e:logger.e

      8、rror(e.message, exc_info=True)cost = time.time() - startlogger.info( cost s.format(func_name, cost) classmethod def delete_job(cls, job): try:job.delete() except beanstalkc.CommandFailed as e:logger.warning(e, exc_info=1) classmethod def bury_job(cls, job): try:job.bury() except beanstalkc.CommandFailed as e:logger.warning(e, exc_info=1) def graceful_shutdown(self): self.signal_shutdown = True写上面代码的时候,发现一个问题:通过 Subscriber 注册函数名和函数本身的对应关系,是在一个 Python 解释器,也就是在一个进程里运行的,而 Worker 又是异步在另外的进程运行,怎么样才能让 Worker 也能拿到和 Putter 一样的 Subscriber。最后发现通过 Python 的装饰器机制可以解决这个问题。就是这句解决了 Subscriber 的问题import_module_by_str(pear.web.controllers.controller_crawler)# import_module_by_str 的实现 def import_module_by_str(module_name): if isinstance(module_name, unicode):module_name = str(module_name)_import_(module_name)执行 import_module_by_str 时, 会调用 _import_ 动态加载类和函数。将使用了 JobQueue 的函数所在模块加载到内存之后。当 运行 Woker 时,Python 解释器就会先执行 修饰的装饰器代码,也就会把 Subscriber 中的对应关系加载到内存。

      《Python使用Beanstalkd做异步任务处理的方法》由会员m****分享,可在线阅读,更多相关《Python使用Beanstalkd做异步任务处理的方法》请在金锄头文库上搜索。

      点击阅读更多内容
    最新标签
    监控施工 信息化课堂中的合作学习结业作业七年级语文 发车时刻表 长途客运 入党志愿书填写模板精品 庆祝建党101周年多体裁诗歌朗诵素材汇编10篇唯一微庆祝 智能家居系统本科论文 心得感悟 雁楠中学 20230513224122 2022 公安主题党日 部编版四年级第三单元综合性学习课件 机关事务中心2022年全面依法治区工作总结及来年工作安排 入党积极分子自我推荐 世界水日ppt 关于构建更高水平的全民健身公共服务体系的意见 空气单元分析 哈里德课件 2022年乡村振兴驻村工作计划 空气教材分析 五年级下册科学教材分析 退役军人事务局季度工作总结 集装箱房合同 2021年财务报表 2022年继续教育公需课 2022年公需课 2022年日历每月一张 名词性从句在写作中的应用 局域网技术与局域网组建 施工网格 薪资体系 运维实施方案 硫酸安全技术 柔韧训练 既有居住建筑节能改造技术规程 建筑工地疫情防控 大型工程技术风险 磷酸二氢钾 2022年小学三年级语文下册教学总结例文 少儿美术-小花 2022年环保倡议书模板六篇 2022年监理辞职报告精选 2022年畅想未来记叙文精品 企业信息化建设与管理课程实验指导书范本 草房子读后感-第1篇 小数乘整数教学PPT课件人教版五年级数学上册 2022年教师个人工作计划范本-工作计划 国学小名士经典诵读电视大赛观后感诵读经典传承美德 医疗质量管理制度 2
    关于金锄头网 - 版权申诉 - 免责声明 - 诚邀英才 - 联系我们
    手机版 | 川公网安备 51140202000112号 | 经营许可证(蜀ICP备13022795号)
    ©2008-2016 by Sichuan Goldhoe Inc. All Rights Reserved.