
本文介绍如何使用 Flask-APScheduler 在 Flask 服务内部实现每小时自动重爬目标 URL、更新数据并实时反映到网页,避免手动重启或外部调度依赖。
本文介绍如何使用 Flask-APScheduler 在 Flask 服务内部实现每小时自动重爬目标 URL、更新数据并实时反映到网页,避免手动重启或外部调度依赖。
在构建动态内容展示型 Flask 应用(如新闻轮播页、实时资讯看板)时,若后端数据源(如 CMS 页面)频繁更新,但前端又需保持服务长期运行,关键挑战在于:如何让内存中的数据随源头变化而自动刷新,同时确保每次 HTTP 请求获取的是最新结果。直接在全局作用域执行一次爬虫(如原代码中 soup = BeautifulSoup(...))会导致数据“冻结”——后续所有请求都返回初始快照,时间戳也不再更新。
解决思路是将数据获取逻辑封装为可复用的函数,并通过调度器周期性调用该函数,将结果存入线程安全的共享对象(如模块级变量或单例类属性)。以下是完整、可运行的优化方案:
✅ 推荐方案:Flask 内置调度(Flask-APScheduler)
相比 Windows 任务计划程序(需额外管理进程启停、状态同步)或纯 APScheduler(易与 Flask 生命周期冲突),Flask-APScheduler 是专为 Flask 设计的集成方案,自动处理应用上下文、信号绑定与异常恢复,更适合生产级轻量调度。
? 安装依赖
pip install flask flask-apscheduler beautifulsoup4 requests
? 完整可运行示例(适配你的新闻爬虫场景)
# app.py
from datetime import datetime
from flask import Flask, render_template
from flask_apscheduler import APScheduler
from bs4 import BeautifulSoup
import requests
# === 配置与初始化 ===
class Config:
SCHEDULER_API_ENABLED = False # 关闭内置 API(生产环境建议关闭)
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# === 共享数据容器(线程安全)===
class NewsData:
titles = []
pictures = []
last_updated = datetime.now()
# === 核心爬虫函数(被调度器调用)===
def fetch_news():
try:
url = "https://news.clemson.edu/tag/extension/"
headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# 提取图片(data-src)
pictures = []
for img in soup.select('article img.lazyload'):
src = img.get('data-src')
if src and src.startswith('http'):
pictures.append(src)
# 提取标题
titles = []
for header in soup.select('article header'):
h3 = header.find("h3", class_="entry-title bold")
if h3 and h3.get_text(strip=True):
titles.append(h3.get_text(strip=True))
# 原子性更新共享数据
NewsData.titles = titles[:5] # 限制数量,防页面过长
NewsData.pictures = pictures[:5]
NewsData.last_updated = datetime.now()
print(f"[✓] 成功刷新 {len(titles)} 条新闻,时间:{NewsData.last_updated}")
except Exception as e:
print(f"[✗] 爬取失败:{e}")
# === 注册定时任务(每小时执行一次)===
@scheduler.task("interval", id="fetch_news_job", hours=1)
def scheduled_fetch():
fetch_news()
# === Flask 路由(始终返回最新数据)===
@app.route('/')
def home():
return render_template(
'home.html',
titles=NewsData.titles,
pictures=NewsData.pictures,
current_time=NewsData.last_updated.strftime("%d/%m/%Y %H:%M:%S")
)
# === 首次启动时预加载数据(避免首次访问空白)===
if __name__ == '__main__':
fetch_news() # 启动时立即抓一次
app.run(host='0.0.0.0', port=5000, debug=False) # 生产请改用 Gunicorn? 模板示例(templates/home.html)
<!DOCTYPE html>
<html>
<head><title>Clemson News Feed</title></head>
<body>
<h2>Latest News (Updated: {{ current_time }})</h2>
{% for title, pic in zip(titles, pictures) %}
<div style="margin: 20px 0;">
<img src="{{ pic }}" width="150" alt="{{ title }}">
<h3>{{ title }}</h3>
</div>
{% else %}
<p>No news available.</p>
{% endfor %}
</body>
</html>⚠️ 关键注意事项
- 线程安全:NewsData 类作为全局状态容器,fetch_news() 中的赋值是原子操作(列表/字符串替换),无需额外加锁;若涉及复杂状态(如字典更新),建议使用 threading.Lock。
- 错误隔离:爬虫异常不会中断调度器,try/except 确保单次失败不影响后续任务。
- 调试技巧:开发时可将 hours=1 临时改为 seconds=30 快速验证刷新效果。
- 生产加固:
- 添加 requests 超时(已含)、User-Agent(已含)、反爬应对(如随机延迟、Session 复用);
- 使用 gunicorn 替代 app.run() 启动多进程服务(注意 APScheduler 默认不跨进程,需改用 Redis 存储任务状态);
- 敏感 URL 或 Headers 建议移至配置文件或环境变量。
此方案实现了“一个进程、零外部依赖、数据实时生效”的闭环,既满足你每小时刷新的核心需求,又保持了代码简洁性与可维护性。