Python怎么用sched模块实现定时任务

其他教程   发布日期:2024年11月12日   浏览次数:299

本文小编为大家详细介绍“Python怎么用sched模块实现定时任务”,内容详细,步骤清晰,细节处理妥当,希望这篇“Python怎么用sched模块实现定时任务”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

牛刀小试

我们先来看下面这个案例,代码如下

  1. import sched
  2. import time
  3. def say_hello(name):
  4. print(f"Hello, world, {name}")
  5. scheduler = sched.scheduler(time.time, time.sleep)
  6. scheduler.enter(5, 1, say_hello, ("张三", ))
  7. scheduler.run()

那么上述的代码中,第一步首先则是实例化一个定时器,通过如下的代码

  1. import sched
  2. scheduler = sched.scheduler()

接下来我们通过

  1. enter()
方法来执行定时任务的操作,其中的参数分别是延迟的时间、任务的优先级以及具体的执行函数和执行函数中的参数。像如上的代码就会在延迟5秒钟之后执行
  1. say_hello()
函数

当然要是延迟的时间相等的时候,我们可以设置任务执行的优先级来指定函数方法运行的顺序,例如有如下的代码

  1. import sched
  2. import time
  3. def say_hello(name):
  4. print(f"Hello, world, {name}")
  5. def say_hello_2(name):
  6. print(f"Hello, {name}")
  7. scheduler = sched.scheduler(time.time, time.sleep)
  8. scheduler.enter(5, 2, say_hello, ("张三", ))
  9. scheduler.enter(5, 1, say_hello_2, ("李四", ))
  10. scheduler.run()

如上述代码,尽管延迟的时间都是一样的,但是

  1. say_hello()
方法的优先级明显要比
  1. say_hello_2()
方法要低一些,因此后者会优先执行。

进阶使用

除了让函数延迟执行,我们还可以让其重复执行,具体这样来操作,代码如下

  1. import sched
  2. import time
  3. def say_hello():
  4. print("Hello, world!")
  5. scheduler = sched.scheduler(time.time, time.sleep)
  6. def repeat_task():
  7. scheduler.enter(5, 1, say_hello, ())
  8. scheduler.enter(5, 1, repeat_task, ())
  9. repeat_task()
  10. scheduler.run()

这里我们新建了一个

  1. repeat_task()
自定义函数,调用了
  1. scheduler.enter()
方法5秒钟执行一次之前定义的
  1. say_hello()
函数

在固定时间执行任务

同时我们还可以让任务在指定的时间执行,这里用到

  1. scheduler.entertabs()
方法,代码如下
  1. import sched
  2. import time
  3. def say_hello():
  4. print("Hello, world!")
  5. scheduler = sched.scheduler(time.time, time.sleep)
  6. # 指定时间执行任务
  7. specific_time = time.time() + 5 # 距离现在的5秒钟之后执行
  8. scheduler.enterabs(specific_time, 1, say_hello, ())
  9. scheduler.run()

我们传入其中参数使其在指定的时间,也就是距离当下的5秒钟之后来执行任务

执行多个任务

这里仍然是调用

  1. enter()
方法来运行多个任务,代码如下
  1. import sched
  2. import time
  3. def task_one():
  4. print("Task One - Hello, world!")
  5. def task_two():
  6. print("Task Two - Hello, world!")
  7. scheduler = sched.scheduler(time.time, time.sleep)
  8. # 任务一在两秒钟只有执行
  9. scheduler.enter(2, 1, task_one, ())
  10. # 任务二在五秒钟之后运行
  11. scheduler.enter(5, 1, task_two, ())
  12. scheduler.run()

这里定义了两个函数,

  1. task_one
  1. task_two
里面分是同样的执行逻辑,打印出“Hello, world!”,然后
  1. task_one()
是在两秒钟之后执行而
  1. task_two()
则是在5秒钟之后执行,两者执行的优先级都是一样的。

以不同的优先级执行不同的任务

这回我们给

  1. task_one()
  1. task_two()
赋予不同的优先级,看一看执行的结果如下
  1. import sched
  2. import time
  3. def task_one():
  4. print("Task One - Hello, world!")
  5. def task_two():
  6. print("Task Two - Hello, world!")
  7. scheduler = sched.scheduler(time.time, time.sleep)
  8. # 优先级是1
  9. scheduler.enter(2, 2, task_one, ())
  10. # 优先级是2
  11. scheduler.enter(5, 1, task_two, ())
  12. scheduler.run()

output

Task One - Hello, world!
Task Two - Hello, world!

上述的代码会在停顿两秒之后运行

  1. task_one()
函数,再停顿3秒之后执行
  1. task_two()
函数

定时任务加上取消方法

我们给定时任务添加上取消的方法,代码如下

  1. import sched
  2. import time
  3. def task_one():
  4. print("Task One - Hello, world!")
  5. def task_two():
  6. print("Task Two - Hello, world!")
  7. scheduler = sched.scheduler(time.time, time.sleep)
  8. # 任务一在两秒钟只有执行
  9. task_one_event = scheduler.enter(2, 1, task_one, ())
  10. # 任务二在五秒钟之后运行
  11. task_two_event = scheduler.enter(5, 1, task_two, ())
  12. # 取消执行task_one
  13. scheduler.cancel(task_one_event)
  14. scheduler.run()

我们将两秒钟之后执行的

  1. task_one()
方法给取消掉,最后就只执行了
  1. task_two()
方法,也就打印出来“Task Two - Hello, world!”

执行备份程序

我们来写一个备份的脚本,在每天固定的时间将文件备份,代码如下

  1. import sched
  2. import time
  3. import shutil
  4. def backup_files():
  5. source = '路径/files'
  6. destination = '路径二'
  7. shutil.copytree(source, destination)
  8. def schedule_backup():
  9. # 创建新的定时器
  10. scheduler = sched.scheduler(time.time, time.sleep)
  11. # 备份程序在每天的1点来执行
  12. backup_time = time.strptime('01:00:00', '%H:%M:%S')
  13. backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ())
  14. # 开启定时任务
  15. scheduler.run()
  16. schedule_backup()

我们通过

  1. shutil
模块当中的
  1. copytree()
方法来执行拷贝文件,然后在每天的1点准时执行

执行定时分发邮件的程序

最后我们来执行定时分发邮件的程序,代码如下

  1. import sched
  2. import time
  3. import smtplib
  4. from email.mime.text import MIMEText
  5. def send_email(subject, message, from_addr, to_addr, smtp_server):
  6. # 邮件的主体信息
  7. email = MIMEText(message)
  8. email['Subject'] = subject
  9. email['From'] = from_addr
  10. email['To'] = to_addr
  11. # 发邮件
  12. with smtplib.SMTP(smtp_server) as server:
  13. server.send_message(email)
  14. def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time):
  15. # 创建定时任务的示例
  16. scheduler = sched.scheduler(time.time, time.sleep)
  17. # 定时邮件
  18. scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server))
  19. # 开启定时器
  20. scheduler.run()
  21. subject = 'Test Email'
  22. message = 'This is a test email'
  23. from_addr = 'test@example.com'
  24. to_addr = 'test@example.com'
  25. smtp_server = 'smtp.test.com'
  26. scheduled_time = time.time() + 60 # 一分钟之后执行程序
  27. send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)

以上就是Python怎么用sched模块实现定时任务的详细内容,更多关于Python怎么用sched模块实现定时任务的资料请关注九品源码其它相关文章!