Python Scheduled Tasks: Automation Examples for Greeting, Backup, Monitoring, and More
This article presents a collection of Python code examples that demonstrate how to schedule recurring tasks such as printing greetings, backing up files, scraping data, cleaning caches, sending emails, monitoring servers, updating databases, uploading to cloud storage, executing scripts, and analyzing logs.
The following Python snippets illustrate how to implement common scheduled automation tasks using simple loops and the time module, suitable for backend operations and system maintenance.
Timed greeting
import time
def greet():
print("Hello, it's time for your daily reminder!")
# 每隔5秒打印一次问候语
while True:
greet()
time.sleep(5)Timed file backup
import os
import shutil
import time
def backup_file(source, destination):
shutil.copy2(source, destination)
print(f"File {source} has been backed up to {destination}")
# 每天凌晨2点备份文件
backup_time = 2 * 3600 # 2 hours after midnight
while True:
current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
if current_time >= backup_time:
backup_file("/path/to/source", "/path/to/destination")
time.sleep(24 * 3600) # Sleep for one day
else:
time.sleep(60) # Check every minuteTimed web data fetch
import requests
import time
from bs4 import BeautifulSoup
def fetch_data(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
data = soup.find('some', 'tag')
print(data.text.strip())
# 每小时抓取一次数据
while True:
fetch_data("http://example.com/data")
time.sleep(3600) # Sleep for one hourTimed cache cleaning
import os
import time
def clean_cache(directory):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Removed {file_path}")
# 每周清理一次缓存
clean_time = 7 * 24 * 3600 # 7 days
last_clean = 0
while True:
if time.time() - last_clean > clean_time:
clean_cache("/path/to/cache/directory")
last_clean = time.time()
time.sleep(60) # Check every minuteTimed email reminder
import smtplib
import time
from email.mime.text import MIMEText
def send_email(subject, body, to_addr):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = '[email protected]'
msg['To'] = to_addr
server = smtplib.SMTP('smtp.example.com', 587)
server.starttls()
server.login("[email protected]", "your-password")
server.sendmail('[email protected]', [to_addr], msg.as_string())
server.quit()
print(f"Email sent to {to_addr}")
# 每天早上9点发送邮件
email_time = 9 * 3600 # 9 hours after midnight
while True:
current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
if current_time >= email_time:
send_email("Daily Reminder", "This is your daily reminder!", "[email protected]")
time.sleep(24 * 3600) # Sleep for one day
else:
time.sleep(60) # Check every minuteTimed server status monitoring
import requests
import time
def check_server_status(url):
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"Server {url} is UP")
else:
print(f"Server {url} responded with status code {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Failed to connect to {url}: {e}")
# 每10分钟检查一次
while True:
check_server_status("http://example.com/status")
time.sleep(600) # Sleep for 10 minutesTimed database record update
import pymysql
import time
def update_record(db, table, column, value, condition):
conn = pymysql.connect(host='localhost', user='root', password='', db=db)
with conn.cursor() as cursor:
cursor.execute(f"UPDATE {table} SET {column} = %s WHERE {condition}", (value,))
conn.commit()
print(f"Updated record in {table}")
conn.close()
# 每天晚上11点更新记录
update_time = 23 * 3600 # 23 hours after midnight
while True:
current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
if current_time >= update_time:
update_record("mydb", "users", "status", "active", "id = 1")
time.sleep(24 * 3600) # Sleep for one day
else:
time.sleep(60) # Check every minuteTimed upload to cloud storage (AWS S3)
import boto3
import time
def upload_to_s3(file_path, bucket_name, object_name):
s3 = boto3.client('s3')
s3.upload_file(file_path, bucket_name, object_name)
print(f"File {file_path} uploaded to S3 bucket {bucket_name}")
# 每天下午3点上传文件
upload_time = 15 * 3600 # 15 hours after midnight
while True:
current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
if current_time >= upload_time:
upload_to_s3("/path/to/local/file", "my-bucket", "remote/path/to/file")
time.sleep(24 * 3600) # Sleep for one day
else:
time.sleep(60) # Check every minuteTimed script execution
import subprocess
import time
def run_script(script_path):
subprocess.run(["python", script_path])
print(f"Script {script_path} has been executed")
# 每天中午12点执行脚本
run_time = 12 * 3600 # 12 hours after midnight
while True:
current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
if current_time >= run_time:
run_script("/path/to/script.py")
time.sleep(24 * 3600) # Sleep for one day
else:
time.sleep(60) # Check every minuteTimed log file analysis
import time
import re
def analyze_log(log_file):
pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - \[(.*?)\] "(GET|POST) (.*?) HTTP.*?" (\d{3}) (\d+)'
with open(log_file, 'r') as file:
content = file.read()
matches = re.findall(pattern, content)
for match in matches:
ip, timestamp, method, path, status, size = match
print(f"{ip} accessed {path} at {timestamp} with status {status}")
# 每小时统计一次
while True:
analyze_log("/path/to/logfile.log")
time.sleep(3600) # Sleep for one hourThese examples can be adapted to run as background services, cron jobs, or integrated into larger automation frameworks.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.