今天学习过程中写了一个flask搜索引擎包括爬虫和搜索页面。功能有全文搜索,分页,爬虫等等。
只需安装flask和jieba即可:
pip install flask jieba
搜索引擎后端:
from flask import Flask, render_template, request, session, jsonify
import sqlite3
import jieba
import math
import string
import re
app = Flask(__name__)
DATABASE = 'data.db'
def create_database():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5(
title, url , favicon , description , content , keywords , date,img )''')
conn.commit()
conn.close()
def tokenize(title):
words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分词并去掉标点符号
keywords = [word for word in words if len(word) > 1] # 去掉单个字
keywords = list(set(keywords)) # 去重
keywords.sort(key=words.index) # 按在title中出现的顺序排序
keyword_str = ' '.join(keywords) # 将关键词列表转换为以空格分隔的字符串
keyword_str = ''.join(filter(lambda x: x not in string.punctuation, keyword_str)) # 去掉字符串中的标点符号
return keyword_str
def search_contents(query, offset, per_page):
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM contents WHERE keywords MATCH :query",
{'query': query})
total_results = c.fetchone()[0] # 获取搜索结果总数
total_pages = calculate_total_pages(total_results, per_page) # 计算总页数
if offset >= total_results:
offset = (total_pages - 1) * per_page
c.execute("SELECT title, url, favicon, description, keywords, date FROM contents WHERE keywords MATCH :query LIMIT :per_page OFFSET :offset",
{'query': query, 'per_page': per_page, 'offset': offset})
rows = c.fetchall()
conn.close()
return {'results': [dict(row) for row in rows],
'total_results': total_results,
'total_pages': total_pages}
def calculate_total_pages(total_results, per_page):
return math.ceil(total_results / per_page)
@app.before_request
def session_online():
session_id = request.cookies.get('session_id')
online = session.get('Online', 0)
if session_id is not None:
online += 1
session['Online'] = online
@app.route('/get_suggestions')
def get_suggestions():
query = request.args.get('q')
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 在contents表中查询包含输入关键词的title列,最多返回5个结果
c.execute("SELECT title FROM contents WHERE title LIKE ? LIMIT 5", ('%' + query + '%',))
suggestions = [row[0] for row in c.fetchall()]
conn.close()
return jsonify(suggestions=suggestions)
@app.route('/', methods=['GET'])
def index():
# 处理搜索请求
query = request.args.get('q', '') # 获取查询关键词,默认为空字符串
page = request.args.get('page', '1') # 获取当前页数,默认为第1页
per_page = 10 # 每页显示的结果数量
offset = (int(page) - 1) * per_page # 计算偏移量
online = session.get('Online', 0)
if query:
# 搜索网页内容
content_result = search_contents(tokenize(query), offset, per_page)
return render_template('index.html',
query=query,
content_result=content_result['results'],
total_results=content_result['total_results'], # 显示搜索结果总数
total_pages=content_result['total_pages'],
current_page=int(page),
online=online)
else:
return render_template('index.html',
online=online)
if __name__ == '__main__':
create_database()
app.secret_key = 'pyxueba'
app.run(debug=True)
搜索引擎前端:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Python学霸搜索引擎</title>
<link rel="icon" type="image/svg+xml" href="favicon.svg">
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 50px;
}
h1 {
font-size: 24px;
margin-bottom: 20px;
text-align: center;
}
.search-box {
margin-bottom: 20px;
text-align: center;
}
.search-box input[type="text"] {
padding: 6px 2px;
font-size: 16px;
border-radius: 4px;
border: 1px solid #999;
width: 40%;
max-width: 100%;
}
.search-box button[type="submit"] {
padding: 6px 12px;
font-size: 16px;
border-radius: 4px;
background-color: #006621;
color: #fff;
border: none;
cursor: pointer;
}
.search-box button[type="submit"]:hover {
background-color: #00511a;
}
.result-item {
margin-bottom: 20px;
border: 1px solid #ddd;
border-radius: 4px;
padding: 10px;
}
a {
text-decoration: none;
}
.result-title {
font-size: 20px;
font-weight: bold;
text-align: left; /* 修改此行 */
}
.result-title a {
color: #008000;
}
.result-url {
color: #000000;
font-size: 14px;
margin-bottom: 5px;
}
.result-time {
font-size: 14px;
color: #999;
}
.result-description {
margin-top: 10px;
}
.pagination {
margin-top: 20px;
text-align: center;
}
.pagination-link {
display: inline-block;
padding: 6px 12px;
margin-right: 5px;
color: #333;
border-radius: 4px;
background-color: #f5f5f5;
text-decoration: none;
}
.pagination-link:hover {
background-color: #ddd;
}
.highlight {
background-color: #FFD700;
}
.footer {
margin-top: 50px;
text-align: center;
color: #999;
font-size: 12px;
}
.visitor-count {
margin-top: 10px;
}
.visitor-count span {
margin-left: 5px;
}
.favicon {
width: 16px;
height: 16px;
margin-right:3px;
}
</style>
</head>
<body>
<h1>python学霸全文搜索</h1>
<div class="search-box">
<form action="/" method="get">
<input type="text" name="q" id="search-input" list="suggestion-list" placeholder="你负责搜,我负责找···">
<datalist id="suggestion-list--------" class="suggestion-list------"></datalist>
<button type="submit">搜索</button>
</form>
</div>
{% if content_result %}
<p>共找到 {{ total_results }} 条结果。</p>
{% for result in content_result %}
<div class="search-summary">
</div>
<div class="result-item">
<h2 class="result-title"><img src="{{ result.favicon }}" alt="Favicon" class="favicon"
style="border: 1px solid #ccc; border-radius: 5px;" /><a class="result-link" href="{{ result.url }}"
target="_blank">{{ result.title }}</a></h2>
<p class="result-url"><span class="time">{{ result.date }}</span> {{ result.description }}</p>
</div>
{% endfor %}
<div class="pagination">
{% if total_pages > 1 %}
{% for page in range(1, total_pages + 1) %}
{% if page == current_page %}
<a class="pagination-link highlight" href="/?q={{ query }}&page={{ page }}">{{ page }}</a>
{% else %}
<a class="pagination-link" href="/?q={{ query }}&page={{ page }}">{{ page }}</a>
{% endif %}
{% endfor %}
{% endif %}
</div>
{% endif %}
<div class="footer">
@2023 Python学霸.
<div class="visitor-count">
<p>总访问: {{ online }}</p>
</div>
</div>
<script>
// JavaScript 可选,用于给搜索关键词添加高亮样式
window.onload = function () {
var query = "{{ query }}";
var titles = document.getElementsByClassName("result-title");
for (var i = 0; i < titles.length; i++) {
var title = titles[i];
var highlighted = title.innerHTML.replace(new RegExp(query, "gi"), '<span class="highlight">$&</span>');
title.innerHTML = highlighted;
}
};
</script>
<script type="text/javascript">
$(document).ready(function () {
$('#search-input').on('input', function () {
var query = $(this).val();
if (query.trim().length > 0) { // 确保输入不是空白字符
$.ajax({
url: '/get_suggestions',
data: { q: query },
success: function (response) {
var suggestions = response.suggestions;
var suggestionList = $('#suggestion-list');
suggestionList.empty(); // 清空之前的建议列表
for (var i = 0; i < suggestions.length; i++) {
var suggestionItem = $('<li>').text(suggestions[i]);
suggestionList.append(suggestionItem);
}
suggestionList.show(); // 显示建议列表
}
});
} else {
$('#suggestion-list').empty().hide(); // 输入为空时隐藏建议列表
}
});
// 当用户点击建议项时将其填充到搜索框中
$('#suggestion-list').on('click', 'li', function () {
var selectedSuggestion = $(this).text();
$('#search-input').val(selectedSuggestion);
$('#suggestion-list').empty().hide(); // 填充后隐藏建议列表
});
});
</script>
</body>
</html>
爬虫:
import requests
from bs4 import BeautifulSoup
import sqlite3
import jieba
import threading
import time
import random
import string
import re
from datetime import date
import base64
class Crawler:
def get_image_data_uri(self,image_url):
# 发起GET请求获取图像数据
response = requests.get(image_url)
image_data = response.content
# 将图像数据转换为base64格式
base64_data = base64.b64encode(image_data).decode('utf-8')
# 构建包含base64图像数据的data URI
data_uri = f"data:image/x-icon;base64,{base64_data}"
# 返回data URI
return data_uri
def __init__(self, max_depth=3, num_workers=10):
self.max_depth = max_depth
self.num_workers = num_workers
self.conn = sqlite3.connect('data.db', check_same_thread=False)
self.lock = threading.Lock()
self.url_queue = []
self.crawled_urls = set()
self.create_tables()
self.add_urls(['https://www.hao123.com/'])
self.run()
def create_tables(self):
c = self.conn.cursor()
c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5 (
title ,
url ,
favicon ,
description ,
keywords ,
date ,
img )''')
self.conn.commit()
def add_urls(self, urls):
with self.lock:
self.url_queue.extend(urls)
def crawl_and_save(self, url, depth=0):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
if ".ico" not in url and ".jpg" not in url and ".png" not in url and "javascript:;" not in url and "#" not in url and "javascript:void(0)" not in url and "javascript" not in url and url != '':
response = requests.get(url, headers=headers, timeout=2.5)
response.raise_for_status()
else:
print(f"无效:{url} ")
return
except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:
print(f"无法获取链接 {url}:{e}")
return
content_type = response.headers.get('content-type')
if not content_type or not content_type.startswith('text/html'):
return
raw_html = response.content
html_text = response.text
soup = BeautifulSoup(raw_html, 'html.parser')
title_tag = soup.title
title=""
if title_tag is None:
print(f"链接 {url} 未找到标题,跳过...")
return
if title_tag is not None:
title = title_tag.string.strip()
if not title:
print(f"链接 {url} 标题为空,跳过...")
return
title2 = " ".join(jieba.cut(title))
title2 = "".join([char for char in title if char not in string.punctuation]) # 去掉标点符号
with self.lock:
if url in self.crawled_urls:
return
date_regex = re.compile(r'\d{4}-\d{2}-\d{2}') # 假设日期格式为YYYY-MM-DD
date_match = date_regex.search(html_text)
if date_match:
shijian = date_match.group()
else:
# 使用meta标签提取日期信息
date_tag = soup.select_one('meta[name="date"], meta[name="pubdate"]')
shijian = date_tag.get('content') if date_tag else None
# 如果日期为空,使用当前日期
if not shijian or shijian.strip() == '':
shijian = str(date.today())
print(shijian)
try:
keywords = self.extract_keywords(title2)
description, favicon, img_urls = self.extract_page_info(soup)
if favicon:
favicon=self.get_image_data_uri(favicon);
c = self.conn.cursor()
c.execute(
"INSERT INTO contents(title, url, favicon, description, keywords, date, img) VALUES (?, ?, ?, ?, ?, ?, ?)",
(title, url, favicon, description, ",".join(keywords), shijian, "\n".join(img_urls)))
self.conn.commit()
self.crawled_urls.add(url)
print(f"正在爬取 '{url}' 并保存到数据库...")
except sqlite3.IntegrityError:
pass
if depth < self.max_depth:
links = soup.find_all('a', href=True)
for link in links:
next_url = link['href']
if not next_url.startswith('http'):
next_url = url + next_url
self.add_urls([next_url]) # 添加新的URL到队列中
@staticmethod
def extract_keywords(title):
words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分词并去掉标点符号
keywords = [word for word in words if len(word) > 0] # 去掉单个字
keywords = list(set(keywords)) # 去重
keywords.sort(key=words.index) # 按在 title 中出现的顺序排序
#keywords = keywords[:10] # 只保留前 10 个关键词
return keywords
@staticmethod
def extract_page_info(soup):
description = ""
favicon = ""
img_urls = []
meta_description = soup.find('meta', attrs={'name': 'description'})
if meta_description and meta_description.has_attr('content'):
description = meta_description['content']
link_favicon = soup.find('link', attrs={'rel': 'icon'})
if link_favicon and link_favicon.has_attr('href'):
favicon = link_favicon['href']
img_links = soup.find_all('img')
img_urls = [img.get('src') for img in img_links]
img_urls = [img for img in img_urls if img is not None]
return description, favicon, img_urls
def worker(self):
while True:
url = None
with self.lock:
if self.url_queue:
url = self.url_queue.pop(0)
if url is None:
break
# 添加随机延时
delay = random.uniform(1, 3)
time.sleep(delay)
self.crawl_and_save(url)
def run(self):
threads = []
for _ in range(self.num_workers):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
for t in threads:
t.join()
self.conn.close()
#self.run()
if __name__ == '__main__':
crawler = Crawler(max_depth=5, num_workers=5)
可能有一些bug,提示词功能已经加好了需要html前端中更改id。
来源——python学霸