博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python定时任务模块APScheduler
阅读量:5331 次
发布时间:2019-06-14

本文共 7387 字,大约阅读时间需要 24 分钟。

一、简单任务

定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了

5秒整倍数,就执行这个函数

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimedef aps_test():    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好')scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, trigger='cron', second='*/5')scheduler.start()

带参数的

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimedef aps_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5')scheduler.start()
# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimedef aps_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3)scheduler.start()

二、日志

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimeimport logginglogging.basicConfig(level=logging.INFO,                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',                    datefmt='%Y-%m-%d %H:%M:%S',                    filename='log1.txt',                    filemode='a')def aps_test(x):    print 1/0    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')scheduler._logger = loggingscheduler.start()

三、删除任务

要求执行一定阶段任务以后,删除某一个循环任务,其他任务照常进行。有如下代码:

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimeimport logginglogging.basicConfig(level=logging.INFO,                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',                    datefmt='%Y-%m-%d %H:%M:%S',                    filename='log1.txt',                    filemode='a')def aps_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def aps_date(x):    scheduler.remove_job('interval_task')    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)    scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5', id='cron_task')scheduler.add_job(func=aps_date, args=('一次性任务,删除循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task')scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')scheduler._logger = loggingscheduler.start()

四、停止任务,恢复任务

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimeimport logginglogging.basicConfig(level=logging.INFO,                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',                    datefmt='%Y-%m-%d %H:%M:%S',                    filename='log1.txt',                    filemode='a')def aps_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def aps_pause(x):    scheduler.pause_job('interval_task')    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def aps_resume(x):    scheduler.resume_job('interval_task')    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5', id='cron_task')scheduler.add_job(func=aps_pause, args=('一次性任务,停止循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task')scheduler.add_job(func=aps_resume, args=('一次性任务,恢复循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task')scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')scheduler._logger = loggingscheduler.start()

五、捕获错误

# coding:utf-8from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERRORimport datetimeimport logginglogging.basicConfig(level=logging.INFO,                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',                    datefmt='%Y-%m-%d %H:%M:%S',                    filename='log1.txt',                    filemode='a')def aps_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def date_test(x):    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)    print (1/0)def my_listener(event):    if event.exception:        print ('任务出错了!!!!!!')    else:        print ('任务照常运行...')scheduler = BlockingScheduler()scheduler.add_job(func=date_test, args=('一定性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)scheduler._logger = loggingscheduler.start()

 六、定时任务的接口设计

# 定时任务功能@admin.route('/pause', methods=['POST'])@user_login_reqdef pausetask():  # 停止    data = request.form.get('task_id')    job = scheduler.get_job(str(data))    res = {}    if job:        if 'pause' in job.__str__():            res.update({
'status': 1001, 'msg': '已停止'}) else: scheduler.pause_job(str(data)) res.update({
'status': 1000, 'msg': '停止中'}) else: res.update({
'status': 1001, 'msg': '未运行'}) return jsonify(res)@admin.route('/resume', methods=['POST'])@user_login_reqdef resumetask(): # 恢复 data = request.form.get('task_id') job=scheduler.get_job(str(data)) res={} if job: if 'run' in job.__str__(): res.update({
'status':1001,'msg':'已恢复'}) else: scheduler.resume_job(str(data)) res.update({
'status': 1000, 'msg': '恢复中'}) else: res.update({
'status':1001,'msg':'未运行'}) return jsonify(res)@admin.route('/remove_task', methods=['POST'])@user_login_reqdef remove_task(): # 移除 data = request.form['task_id'] job = scheduler.get_job(str(data)) res = {} if not job: res.update({
'status': 1001, 'msg': '已删除'}) else: scheduler.remove_job(str(data)) res.update({
'status': 1000, 'msg': '删除中'}) return jsonify(res)@admin.route('/addjob', methods=['POST'])@user_login_reqdef addtask(): data = request.form.get('task_id') job = scheduler.get_job(str(data)) if job: return jsonify({
'status': 1001,'msg':'已开启'}) if data == '1': scheduler.add_job(func=task1, id='1', trigger='cron', day_of_week='0-6', hour=18, minute=19, second=10, replace_existing=True) # trigger='cron' 表示是一个定时任务 else: scheduler.add_job(func=task2, id='2', trigger='interval', seconds=10, replace_existing=True) # trigger='interval' 表示是一个循环任务,每隔多久执行一次 return jsonify({
'status': 1000,'msg':'运行中'})def task1(): print('mession1') print(datetime.datetime.now())def task2(): print('mession2') print(datetime.datetime.now())

 

转载于:https://www.cnblogs.com/angelyan/p/11297252.html

你可能感兴趣的文章
[转载]电脑小绝技
查看>>
windos系统定时执行批处理文件(bat文件)
查看>>
thinkphp如何实现伪静态
查看>>
BZOJ 2243: [SDOI2011]染色( 树链剖分 )
查看>>
BZOJ 1925: [Sdoi2010]地精部落( dp )
查看>>
c++中的string常用函数用法总结!
查看>>
界面交互之支付宝生活圈pk微信朋友圈
查看>>
[DLX精确覆盖+打表] hdu 2518 Dominoes
查看>>
SuperMap iServerJava 6R扩展领域开发及压力测试---判断点在那个面内(1)
查看>>
Week03-面向对象入门
查看>>
一个控制台程序,模拟机器人对话
查看>>
web.xml 中加载顺序
查看>>
pycharm激活地址
查看>>
hdu 1207 四柱汉诺塔
查看>>
Vue 2.x + Webpack 3.x + Nodejs 多页面项目框架(上篇——纯前端多页面)
查看>>
display:none与visible:hidden的区别
查看>>
我的PHP学习之路
查看>>
【题解】luogu p2340 奶牛会展
查看>>
对PostgreSQL的 SPI_prepare 的理解。
查看>>
解决响应式布局下兼容性的问题
查看>>