Flask极限挑战——开发出全文搜索引擎

   今天学习过程中写了一个flask搜索引擎包括爬虫和搜索页面。功能有全文搜索,分页,爬虫等等。

只需安装flask和jieba即可:

pip install flask jieba

图片

图片

搜索引擎后端:

from flask import Flask, render_template, request, session, jsonify
import sqlite3
import jieba
import math
import string
import re

app = Flask(__name__)
DATABASE = 'data.db'


def create_database():
    conn = sqlite3.connect(DATABASE)
    c = conn.cursor()
    c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5(
        title, url , favicon , description , content , keywords , date,img )''')
    conn.commit()
    conn.close()


def tokenize(title):
    words = [word for word in jieba.cut(title) if word not in string.punctuation]  # 分词并去掉标点符号
    keywords = [word for word in words if len(word) > 1]  # 去掉单个字
    keywords = list(set(keywords))  # 去重
    keywords.sort(key=words.index)  # 按在title中出现的顺序排序
    keyword_str = ' '.join(keywords)  # 将关键词列表转换为以空格分隔的字符串
    keyword_str = ''.join(filter(lambda x: x not in string.punctuation, keyword_str))  # 去掉字符串中的标点符号
    return keyword_str


def search_contents(query, offset, per_page):
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    c = conn.cursor()
    c.execute("SELECT COUNT(*) FROM contents WHERE keywords MATCH :query",
              {'query': query})
    total_results = c.fetchone()[0]  # 获取搜索结果总数
    total_pages = calculate_total_pages(total_results, per_page)  # 计算总页数
    if offset >= total_results:
        offset = (total_pages - 1) * per_page
    c.execute("SELECT title, url, favicon, description, keywords, date FROM contents WHERE keywords MATCH :query LIMIT :per_page OFFSET :offset",
              {'query': query, 'per_page': per_page, 'offset': offset})
    rows = c.fetchall()
    conn.close()
    return {'results': [dict(row) for row in rows],
            'total_results': total_results,
            'total_pages': total_pages}


def calculate_total_pages(total_results, per_page):
    return math.ceil(total_results / per_page)


@app.before_request
def session_online():
    session_id = request.cookies.get('session_id')
    online = session.get('Online', 0)
    if session_id is not None:
        online += 1
    session['Online'] = online


@app.route('/get_suggestions')
def get_suggestions():
    query = request.args.get('q')

    conn = sqlite3.connect(DATABASE)
    c = conn.cursor()

    # 在contents表中查询包含输入关键词的title列,最多返回5个结果
    c.execute("SELECT title FROM contents WHERE title LIKE ? LIMIT 5", ('%' + query + '%',))
    suggestions = [row[0] for row in c.fetchall()]

    conn.close()

    return jsonify(suggestions=suggestions)


@app.route('/', methods=['GET'])
def index():
    # 处理搜索请求
    query = request.args.get('q', '')  # 获取查询关键词,默认为空字符串
    page = request.args.get('page', '1')  # 获取当前页数,默认为第1页
    per_page = 10  # 每页显示的结果数量
    offset = (int(page) - 1) * per_page  # 计算偏移量
    online = session.get('Online', 0)
    if query:
        # 搜索网页内容
        content_result = search_contents(tokenize(query), offset, per_page)

        return render_template('index.html',
                               query=query,
                               content_result=content_result['results'],
                               total_results=content_result['total_results'],  # 显示搜索结果总数
                               total_pages=content_result['total_pages'],
                               current_page=int(page),
                               online=online)
    else:
        return render_template('index.html',
                               online=online)


if __name__ == '__main__':
    create_database()
    app.secret_key = 'pyxueba'
    app.run(debug=True)

搜索引擎前端:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Python学霸搜索引擎</title>
    <link rel="icon" type="image/svg+xml" href="favicon.svg">
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 50px;
        }

        h1 {
            font-size: 24px;
            margin-bottom: 20px;
            text-align: center;
        }

        .search-box {
            margin-bottom: 20px;
            text-align: center;
        }

        .search-box input[type="text"] {
            padding: 6px 2px;
            font-size: 16px;
            border-radius: 4px;
            border: 1px solid #999;
            width: 40%;
            max-width: 100%;
        }

        .search-box button[type="submit"] {
            padding: 6px 12px;
            font-size: 16px;
            border-radius: 4px;
            background-color: #006621;
            color: #fff;
            border: none;
            cursor: pointer;
        }

        .search-box button[type="submit"]:hover {
            background-color: #00511a;
        }

        .result-item {
            margin-bottom: 20px;
            border: 1px solid #ddd;
            border-radius: 4px;
            padding: 10px;
        }

        a {
            text-decoration: none;
        }

        .result-title {
            font-size: 20px;
            font-weight: bold;
            text-align: left; /* 修改此行 */
        }

        .result-title a {
            color: #008000;
        }

        .result-url {
            color: #000000;
            font-size: 14px;
            margin-bottom: 5px;
        }

        .result-time {
            font-size: 14px;
            color: #999;
        }

        .result-description {
            margin-top: 10px;
        }

        .pagination {
            margin-top: 20px;
            text-align: center;
        }

        .pagination-link {
            display: inline-block;
            padding: 6px 12px;
            margin-right: 5px;
            color: #333;
            border-radius: 4px;
            background-color: #f5f5f5;
            text-decoration: none;
        }

        .pagination-link:hover {
            background-color: #ddd;
        }

        .highlight {
            background-color: #FFD700;
        }

        .footer {
            margin-top: 50px;
            text-align: center;
            color: #999;
            font-size: 12px;
        }

        .visitor-count {
            margin-top: 10px;
        }

        .visitor-count span {
            margin-left: 5px;
        }

.favicon {
  width: 16px;
  height: 16px;
  margin-right:3px;
}
</style>
</head>

<body>
    <h1>python学霸全文搜索</h1>

    <div class="search-box">
        <form action="/" method="get">
            <input type="text" name="q" id="search-input" list="suggestion-list" placeholder="你负责搜,我负责找···">
            <datalist id="suggestion-list--------" class="suggestion-list------"></datalist>
            <button type="submit">搜索</button>
        </form>
    </div>

    {% if content_result %}
    <p>共找到 {{ total_results }} 条结果。</p>
    {% for result in content_result %}
    <div class="search-summary">
    </div>
    <div class="result-item">
        <h2 class="result-title"><img src="{{ result.favicon }}" alt="Favicon" class="favicon"
                style="border: 1px solid #ccc; border-radius: 5px;" /><a class="result-link" href="{{ result.url }}"
                target="_blank">{{ result.title }}</a></h2>
        <p class="result-url"><span class="time">{{ result.date }}</span> {{ result.description }}</p>
    </div>
    {% endfor %}

    <div class="pagination">
        {% if total_pages > 1 %}
        {% for page in range(1, total_pages + 1) %}
        {% if page == current_page %}
        <a class="pagination-link highlight" href="/?q={{ query }}&amp;page={{ page }}">{{ page }}</a>
        {% else %}
        <a class="pagination-link" href="/?q={{ query }}&amp;page={{ page }}">{{ page }}</a>
        {% endif %}
        {% endfor %}
        {% endif %}
    </div>
    {% endif %}

    <div class="footer">
       @2023 Python学霸.
        <div class="visitor-count">
            <p>总访问: {{ online }}</p>
        </div>
    </div>

    <script>
        // JavaScript 可选,用于给搜索关键词添加高亮样式
        window.onload = function () {
            var query = "{{ query }}";
            var titles = document.getElementsByClassName("result-title");
            for (var i = 0; i < titles.length; i++) {
                var title = titles[i];
                var highlighted = title.innerHTML.replace(new RegExp(query, "gi"), '<span class="highlight">$&</span>');
                title.innerHTML = highlighted;
            }
        };
</script>
    <script type="text/javascript">
        $(document).ready(function () {
            $('#search-input').on('input', function () {
                var query = $(this).val();
                if (query.trim().length > 0) { // 确保输入不是空白字符
                    $.ajax({
                        url: '/get_suggestions',
                        data: { q: query },
                        success: function (response) {
                            var suggestions = response.suggestions;
                            var suggestionList = $('#suggestion-list');
                            suggestionList.empty(); // 清空之前的建议列表
                            for (var i = 0; i < suggestions.length; i++) {
                                var suggestionItem = $('<li>').text(suggestions[i]);
                                suggestionList.append(suggestionItem);
                            }
                            suggestionList.show(); // 显示建议列表
                        }
                    });
                } else {
                    $('#suggestion-list').empty().hide(); // 输入为空时隐藏建议列表
                }
            });

            // 当用户点击建议项时将其填充到搜索框中
            $('#suggestion-list').on('click', 'li', function () {
                var selectedSuggestion = $(this).text();
                $('#search-input').val(selectedSuggestion);
                $('#suggestion-list').empty().hide(); // 填充后隐藏建议列表
            });
        });
</script>
</body>
</html>

爬虫:

import requests
from bs4 import BeautifulSoup
import sqlite3
import jieba
import threading
import time
import random
import string
import re
from datetime import date
import base64
class Crawler:
    def get_image_data_uri(self,image_url):
     # 发起GET请求获取图像数据
     response = requests.get(image_url)
     image_data = response.content

     # 将图像数据转换为base64格式
     base64_data = base64.b64encode(image_data).decode('utf-8')

     # 构建包含base64图像数据的data URI
     data_uri = f"data:image/x-icon;base64,{base64_data}"

     # 返回data URI
     return data_uri
    def __init__(self, max_depth=3, num_workers=10):
        self.max_depth = max_depth
        self.num_workers = num_workers
        self.conn = sqlite3.connect('data.db', check_same_thread=False)
        self.lock = threading.Lock()
        self.url_queue = []
        self.crawled_urls = set()

        self.create_tables()
        self.add_urls(['https://www.hao123.com/'])
        self.run()

    def create_tables(self):
        c = self.conn.cursor()
        c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5 (
                    title ,
                    url ,
                    favicon ,
                    description ,
                    keywords ,
                    date ,
                    img )''')
        self.conn.commit()

    def add_urls(self, urls):
        with self.lock:
            self.url_queue.extend(urls)

    def crawl_and_save(self, url, depth=0):
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            if ".ico" not in url and ".jpg" not in url and ".png" not in url and "javascript:;" not in url and "#" not in url and "javascript:void(0)" not in url and "javascript" not in url and url != '':
             response = requests.get(url, headers=headers, timeout=2.5)
             response.raise_for_status()
            else:
             print(f"无效:{url} ")
             return
        except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:
            print(f"无法获取链接 {url}:{e}")
            return

        content_type = response.headers.get('content-type')
        if not content_type or not content_type.startswith('text/html'):
            return

        raw_html = response.content
        html_text = response.text
        soup = BeautifulSoup(raw_html, 'html.parser')
        title_tag = soup.title
        title=""
        if title_tag is None:
            print(f"链接 {url} 未找到标题,跳过...")
            return
        if title_tag is not None:
          title = title_tag.string.strip()
        if not title:
            print(f"链接 {url} 标题为空,跳过...")
            return
        title2 = " ".join(jieba.cut(title))
        title2 = "".join([char for char in title if char not in string.punctuation])  # 去掉标点符号

        with self.lock:
            if url in self.crawled_urls:
                return

            date_regex = re.compile(r'\d{4}-\d{2}-\d{2}')  # 假设日期格式为YYYY-MM-DD
            date_match = date_regex.search(html_text)
            if date_match:
               shijian = date_match.group()
            else:
             # 使用meta标签提取日期信息
               date_tag = soup.select_one('meta[name="date"], meta[name="pubdate"]')
               shijian = date_tag.get('content') if date_tag else None

               # 如果日期为空,使用当前日期
            if not shijian or shijian.strip() == '':
               shijian = str(date.today())
            print(shijian)
            try:
                keywords = self.extract_keywords(title2)
                description, favicon, img_urls = self.extract_page_info(soup)
                if favicon:
                 favicon=self.get_image_data_uri(favicon);
                c = self.conn.cursor()
                c.execute(
                    "INSERT INTO contents(title, url, favicon, description, keywords, date, img) VALUES (?, ?, ?, ?, ?, ?, ?)",
                    (title, url, favicon, description, ",".join(keywords), shijian, "\n".join(img_urls)))
                self.conn.commit()
                self.crawled_urls.add(url)
                print(f"正在爬取 '{url}' 并保存到数据库...")
            except sqlite3.IntegrityError:
                pass

        if depth < self.max_depth:
            links = soup.find_all('a', href=True)
            for link in links:
                next_url = link['href']
                if not next_url.startswith('http'):
                    next_url = url + next_url
                self.add_urls([next_url])  # 添加新的URL到队列中

    @staticmethod
    def extract_keywords(title):
     words = [word for word in jieba.cut(title) if word not in string.punctuation]  # 分词并去掉标点符号
     keywords = [word for word in words if len(word) > 0]  # 去掉单个字
     keywords = list(set(keywords))  # 去重
     keywords.sort(key=words.index)  # 按在 title 中出现的顺序排序
     #keywords = keywords[:10]  # 只保留前 10 个关键词
     return keywords

    @staticmethod
    def extract_page_info(soup):
        description = ""
        favicon = ""
        img_urls = []

        meta_description = soup.find('meta', attrs={'name': 'description'})
        if meta_description and meta_description.has_attr('content'):
            description = meta_description['content']

        link_favicon = soup.find('link', attrs={'rel': 'icon'})
        if link_favicon and link_favicon.has_attr('href'):
            favicon = link_favicon['href']

        img_links = soup.find_all('img')
        img_urls = [img.get('src') for img in img_links]
        img_urls = [img for img in img_urls if img is not None]

        return description, favicon, img_urls

    def worker(self):
        while True:
            url = None
            with self.lock:
                if self.url_queue:
                    url = self.url_queue.pop(0)

            if url is None:
                break

            # 添加随机延时
            delay = random.uniform(1, 3)
            time.sleep(delay)

            self.crawl_and_save(url)

    def run(self):
        threads = []
        for _ in range(self.num_workers):
            t = threading.Thread(target=self.worker)
            t.start()
            threads.append(t)

        for t in threads:
            t.join()

        self.conn.close()
            #self.run()


if __name__ == '__main__':
    crawler = Crawler(max_depth=5, num_workers=5)

  可能有一些bug,提示词功能已经加好了需要html前端中更改id。
 
来源------python学霸

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇