python定时任务schedule库用法详细讲解
作者:IT之一小佬 发布时间:2022-11-01 10:19:29
标签:python,定时任务,schedule库
前言
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。
如果想执行多个任务,也可以添加多个task。
首先安装schedule库:
pip install schedule
1、按时间间隔执行定时任务
示例代码1:
import schedule
from datetime import datetime
def task():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts)
def task2():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + '666!')
def func():
# 清空任务
schedule.clear()
# 创建一个按3秒间隔执行任务
schedule.every(3).seconds.do(task)
# 创建一个按2秒间隔执行任务
schedule.every(2).seconds.do(task2)
while True:
schedule.run_pending()
func()
运行结果:
示例代码2:
import schedule
import time
def job(name):
print("her name is : ", name)
name = "张三"
schedule.every(10).minutes.do(job, name)
schedule.every().hour.do(job, name)
schedule.every().day.at("10:30").do(job, name)
schedule.every(5).to(10).days.do(job, name)
schedule.every().monday.do(job, name)
schedule.every().wednesday.at("13:15").do(job, name)
while True:
schedule.run_pending()
time.sleep(1)
参数解释:
每隔十分钟执行一次任务
每隔一小时执行一次任务
每天的10:30执行一次任务
每隔5到10天执行一次任务
每周一的这个时候执行一次任务
每周三13:15执行一次任务
run_pending:运行所有可以运行的任务
注意:schedule方法是串行的,也就是说,如果各个任务之间时间不冲突,那是没问题的;如果时间有冲突的话,会串行的执行命令。
2、装饰器:通过 @repeat() 装饰静态方法
示例代码:
from datetime import datetime
from schedule import every, repeat, run_pending
@repeat(every(3).seconds)
def task():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + '-333!')
@repeat(every(5).seconds)
def task2():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + "-555555!")
while True:
run_pending()
运行结果:
3、传递参数
示例代码:
from datetime import datetime
import schedule
def task(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
def task2(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
schedule.every(3).seconds.do(task, s='-333')
schedule.every(5).seconds.do(task, s='-555')
while True:
schedule.run_pending()
运行结果:
4、使用装饰器传递参数
示例代码:
from datetime import datetime
from schedule import every, repeat, run_pending
@repeat(every(3).seconds, '-333')
def task(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
@repeat(every(5).seconds, '-555')
def task2(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
while True:
run_pending()
运行结果:
5、取消定时任务
示例代码:
import schedule
i = 0
def some_task():
global i
i += 1
print(i)
if i == 5:
schedule.cancel_job(job)
print('cancel job')
exit(0)
job = schedule.every().second.do(some_task)
while True:
schedule.run_pending()
运行结果:
6、在指定时间执行一次任务
示例代码:
import time
import schedule
def job_that_executes_once():
print('Hello')
return schedule.CancelJob
schedule.every().minute.at(':30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
运行结果:
7、根据标签检索任务
示例代码:
# 检索所有任务:schedule.get_jobs()
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
运行结果:
8、根据标签取消任务
示例代码:
# 取消所有任务:schedule.clear()
import schedule
def greet(name):
print('Hello {}'.format(name))
if name == 'Cancel':
schedule.clear('second-tasks')
print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
schedule.run_pending()
运行结果:
9、运行任务到某时间
示例代码:
import schedule
from datetime import datetime, timedelta, time
def job():
print('working...')
schedule.every().second.until('23:59').do(job) # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止
while True:
schedule.run_pending()
运行结果:
10、马上运行所有任务(主要用于测试)
示例代码:
import schedule
def job():
print('working...')
def job1():
print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3) # 任务间延迟3秒
运行结果:
11、并行运行:使用 Python 内置队列实现
示例代码:
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)
运行结果:
来源:https://blog.csdn.net/weixin_44799217/article/details/127352957
0
投稿
猜你喜欢
- 单例模式是一个经典设计模式,简要的说,一个类的单例模式就是它只能被实例化一次,实例变量在第一次实例化时就已经固定。 在Python
- 1. 程序背景之前做文件批量移动的时候不小心多加了一个pdf后缀,但问题不大,几行代码就可以搞定~2. 程序要求将以下目录中文件夹中的有问题
- test.asp 测试演示文件clsrsa.asp 实现rsa加密与解密的vbs类文件下面是代码:1. test.asp<%rem 文
- 代码如下:<html> <head> &nb
- 1.使用Paramiko登陆到单台交换机实验拓扑云彩桥接到本机环回接口:192.168.1.1/24三层交换机IP:192.168.1.2/
- 一个简单的验证码爬取程序本文介绍了在Python2.7环境下爬取网站验证码:思路就是获取验证码对应的url,然后发起requst请求,读取该
- Python列表的append()方法踩坑在这之前,我一直认为append()只是个将一个对象添加到列表尾部的方法,但是今天之后,我对它有了
- 源代码如下:#-*- coding:utf-8 -*- def check_exsit(process_name): import win3
- 本次我们选择的安卓游戏对象叫“单词英雄”,大家可以先下载这个游戏。游戏的界面是这样的:通过选择单词的意思进行攻击,选对了就正常攻击,选错了就
- 用Python对数学函数进行求值、求偏导from sympy import *# x = Symbol("x")# y
- 1、实现效果2、实现步骤模块导入import os,sys,timefrom PyQt5 import QtCore,QtWidgets,Q
- 这是一篇关于使用JScript RuntimeObject(MSDN)调试的文章。虽然这些例子中的大多数在其他浏览器中不能运行,但在IE 5
- 本人虽然五音不全,但是听歌还是很喜欢的。希望能利用机器自动制作音乐,本我发现了一个比较适合入门的有趣的开源音乐生成模块 PySynth ,文
- <% dim conn,mdbfile mdbfile=server.mappath("数据库名称.mdb") s
- 如图,今天跑代码的事后遇到的问题,pycharm导入我自己写的各种函数.py文件时有红色标注,显示“no moudle balabala…”
- 登录百度,首先当然是先抓百度的登录包 ,由于是网页登录,最方便的自然是httpwatch了,我使用的测试账号是itiandatest1,密码
- 有时候我们需要判断某一个IP地址是否属于一个网段,以决定该用户能否访问系统.比如用户登录的IP是218.6.7.7,而我们的程序必须判断他是
- 这里介绍几个常用的列表操作添加元素添加元素使用列表的内置方法appendnumber = [1, 2, 3, 4]number.append
- 1、epochKeras官方文档中给出的解释是:“简单说,epochs指的就是训练过程接中数据将被“轮”多少次”(1)释义:训练过程中当一个
- 我想做一个页面,10秒后转向其它页。想在网页中显示10秒的倒计时。谢谢了。对JS不懂 方法一:<html><h