DC's blog DC's blog
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)

DC

愿我一生欢喜,不为世俗所及.
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)
  • Django

  • 第一次学drf

  • 第二次学drf

  • 温故知新

  • flask

    • flask使用

    • flask源码

      • 面试题
      • 快速使用
      • 数据库链接池
        • 基础版
        • 改进版
        • 并发版
          • 知识点-数据库连接池
          • flask使用池 - 函数
          • flask使用池 - 类
          • 单例模式 - 模块对象
          • 基于类实现sqlhelper
          • flask使用池 - 上下文管理
          • with上下文
          • 基于上下文实现sqlhelper
      • flask的使用
      • flask上下文管理
      • 使用简记
  • 后端
  • flask
  • flask源码
DC
2024-09-23
目录

数据库链接池

这个知识点, 在前面的博客总结过.. 参考以往博客:
-1- mysql基础 - pymysql操作, 写了个示例, 将csv文件的内容录入mysql数据库中..
-2- mysql进阶 - other.md - 数据库链接池、sqlhelper、上下文

☆ 老板说,写一个简单脚本,将数据库里的数据导入excel中! -- 利于pymsql就可以实现了!因为该需求不存在什么并发.
☆ 若写Flask程序,flask里没有自带ORM,需要用原生的sql,不出意外都要使用数据库连接池!!

☆ 数据库连接池没必要用上下文来实现,基于类实现的方案就够用了!!


# 基础版

import pymysql
from flask import Flask

app = Flask(__name__)


def fetchall(sql):
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='admin1234', db='order')
    cursor = conn.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    cursor.close()
    conn.close()
    return result

@app.route('/login')
def login():
    result = fetchall('select * from web_level')
    return f'login {result}'


@app.route('/index')
def index():
    result = fetchall('select * from web_level')
    return f'index {result}'


@app.route('/order')
def order():
    result = fetchall('select * from web_level')
    return f'order {result}'


if __name__ == '__main__':
    app.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

image-20240923121517534

基础版有个弊端, 每次执行sql语句都要经历 连接一次db,进行操作, 关闭连接.. 的过程..
可以是可以, 但效率很低!! 我们想, 执行的那些sql能否都用同一个连接?/ 保持一个连接?


# 改进版

其实就是将 数据库连接的代码提取了出来作为类变量, 把连接关闭的代码注释掉了..

import pymysql
from flask import Flask

app = Flask(__name__)

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='admin1234', db='order')

def fetchall(sql):
    cursor = conn.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    cursor.close()
    # conn.close()
    return result

@app.route('/login')
def login():
    result = fetchall('select * from web_level')
    return f'login {result}'


@app.route('/index')
def index():
    result = fetchall('select * from web_level')
    return f'index {result}'


@app.route('/order')
def order():
    result = fetchall('select * from web_level')
    return f'order {result}'


if __name__ == '__main__':
    app.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

若是并发请求, 同时来了n个请求呢? 这里的一个连接是处理不了这种情况的!


# 并发版

# 知识点-数据库连接池

pip3 install dbutils
pip3 install pymysql

import pymysql
from dbutils.pooled_db import PooledDB

MYSQL_DB_POOL = PooledDB(
    # -- 使用链接数据库的模块
    creator=pymysql,
    # -- 连接池允许的最大连接数,0和None表示不限制连接数
    maxconnections=6,
    # -- 初始化时/程序运行时,链接池中至少创建的空闲的链接,0表示不创建
    #    最开始的时候没有那么多请求的到来,没必要在内部创建6个连接 "6是上面maxconnections的值"
    #    所以最小创建连接数,最开始的时候创建两个,等着用,后面不够用了,再自动去增加.
    mincached=2,
    # -- 链接池中最多闲置的链接,0和None不限制
    #    一开始连接池有两个连接,突然用户猛增连接池满了,过了一会用户量又少了,那么过段时间就要对其进行回收.
    # maxcached=5,
    # -- 该参数值无论设置为多少,生效的只会是0! 不用设置!
    #    连接池中最多共享的连接数量.0和None表示全部共享!
    # maxshared=3,
    # -- 连接池中如果没有可用连接,是否阻塞等待. True等待、False不等待然后报错.
    #    eg: 同时来7个人,有个人就得等.. 因为maxconnections=6
    blocking=True,
    # -- 一个连接最多被重复使用的次数. None表示无限制! 一般都设置为None!
    # maxusage=None,
    # -- 开始会话(连接)前执行的命令列表. 如: ["set datestyle to ...", "set time zone ..."]
    # setsession=[],
    # -- 使用连接前先ping下MySQL服务端,检查mysql服务是否可用 该参数值通常设置为0
    #    0 = None = never 不用ping; 1 = default = whenever it is requested 当用连接时ping
    #    2 = when a cursor is created; 4 = when a query is executed ; 7 = always
    ping=0,
    host='127.0.0.1',
    port=3306,
    user='root',
    password='admin1234',
    database='order',
    charset='utf8'
)


def task(num):
    conn = MYSQL_DB_POOL.connection()  # -- 去连接池获取一个连接

    cursor = conn.cursor()
    # cursor.execute('select * from web_level')
    cursor.execute('select sleep(3)')
    result = cursor.fetchall()

    cursor.close()
    # -- ★ 并不是关闭连接 而是将连接交还给数据库连接池
    conn.close()

    print(num, '------------>', result)


def run():
    from threading import Thread
    for i in range(18):
        t = Thread(target=task, args=(i,))
        t.start()


if __name__ == '__main__':
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

你注意观察运行结果,就是6个6个来的!

56

# flask使用池 - 函数

"""
sqlhelper.py 基于函数实现sqlhelper
"""
import pymysql
from dbutils.pooled_db import PooledDB

POOL = PooledDB(
    creator=pymysql,
    maxconnections=6,
    mincached=2,
    blocking=True,
    ping=0,
    host='127.0.0.1',
    port=3306,
    user='root',
    password='admin1234',
    database='order',
    charset='utf8'
)


def fetchall(sql, *args):
    """ 获取所有数据 """
    conn = POOL.connection()
    # 配置了DictCursor后,fetchall获取的每一项的结果是字典!否则为元祖!
    # cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor = conn.cursor()
    cursor.execute(sql, args)
    # 是否配置 DictCursor
    # [{'id': 1, 'active': 1, 'title': 'VIP', 'percent': 90}, 
    #  {'id': 2, 'active': 1, 'title': '金牌', 'percent': 80}]
    # ((1, 1, 'VIP', 90), (2, 1, '金牌', 80))
    result = cursor.fetchall()
    cursor.close()
    conn.close()

    return result


def fetchone(sql, *args):
    """ 获取单条数据 """
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute(sql, args)
    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return result
  
  
"""
flask程序
"""
from flask import Flask
import sqlhelper  # 导入我们写的 函数式 sqlhelper

app = Flask(__name__)


@app.route('/login')
def login():
    result = sqlhelper.fetchone('select * from web_level where active=%s and percent=%s ', '1', '80')
    return f'login - {result}'


@app.route('/index')
def index():
    result = sqlhelper.fetchall('select * from web_level')
    return f'index - {result}'


if __name__ == '__main__':
    app.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

image-20240923140843812

# flask使用池 - 类

# 单例模式 - 模块对象

Python的模块就是天然的单例模式..
因为模块在第一次导入时, 会生成 .pyc 文件, 当第二次导入时, 就会直接加载 .pyc 文件, 而不会再次执行模块代码.
因此, 我们只需把相关的函数和数据定义在一个模块中, 就可以获得一个单例对象了.
如果我们真的想要一个单例类, 可以考虑这样做:

"""
mysingleton.py
"""
class Singleton:
    def foo(self):
        pass

singleton = Singleton()  

"""
在其它文件导入这个对象即可!
"""
from mysingleton import singleton
1
2
3
4
5
6
7
8
9
10
11
12
13
# 基于类实现sqlhelper
"""
sqlhelper.py 基于类实现sqlhelper
"""
import pymysql
from dbutils.pooled_db import PooledDB


class SqlHelper(object):
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=6,
            mincached=2,
            blocking=True,
            ping=0,
            host='127.0.0.1',
            port=3306,
            user='root',
            password='admin1234',
            database='order',
            charset='utf8'
        )

    def open(self):
        conn = self.pool.connection()
        cursor = conn.cursor()
        return conn, cursor

    def close(self, cursor, conn):
        cursor.close()
        conn.close()

    def fetchall(self, sql, *args):
        """ 获取所有数据 """
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchall()
        self.close(conn, cursor)
        return result

    def fetchone(self, sql, *args):
        """ 获取单条数据 """
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchone()
        self.close(conn, cursor)
        return result

    def exec(self, sql, *args):
        """增加、更新、删除时使用该方法!"""
        conn, cursor = self.open()
        cursor.execute(sql, args)
        conn.commit()
        self.close(conn, cursor)


db = SqlHelper()


"""
flask程序
"""
from flask import Flask
from sqlhelper import db

app = Flask(__name__)


@app.route('/one')
def one_d():
    result = db.fetchone('select * from web_level where active=%s and percent=%s ', '1', '80')
    return f'login - {result}'


@app.route('/all')
def all_d():
    result = db.fetchall('select * from web_level')
    return f'index - {result}'


@app.route('/exec')
def exec_d():
    db.exec('update web_level set active=1 where percent=80;')
    return 'xxx'


@app.route('/other')
def other():
    conn, cursor = db.open()
    # ★ To do:自己做一些sql操作
    db.close(conn, cursor)
    return 'xxx'


if __name__ == '__main__':
    app.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

# flask使用池 - 上下文管理

# with上下文
class Context:
    def __init__(self):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def do_something(self):
        pass


if __name__ == '__main__':
    """ ★★★
    with语句,先自动执行obj所在类的__enter__方法,该方法返回啥,ctx就是啥!!
    当执行完with语句体代码时,即结束时会自动执行obj所在类的__exit__方法!!
    >>> __enter__返回啥,ctx就是啥, 那么若__enter__返回self,那么ctx就是Connect()这个实例对象!!
    """
    obj = Context()
    with obj as ctx:
        ctx.do_something()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

再来看个例子, 在该例子中, ctx是Foo的实例对象!!

class Foo:
    def do_something(self):
        pass

    def close(self):
        pass


# 使用context去管理Foo类的执行和关闭
class Context:
    def __init__(self):
        self.data = Foo()

    def __enter__(self):
        # self.data = Foo() 写在这一样的
        return self.data

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.data.close()


if __name__ == '__main__':
    obj = Context()
    with obj as ctx:
        ctx.do_something()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

上下文的应用: open打开文件、关闭文件; 申请锁关闭锁..

# 基于上下文实现sqlhelper

第一点,没必要这样搞!! 复杂化了,基于类实现sqlhelper就够用啦!!
第二点,下面截图的代码 线程会报错!! 哈哈哈哈.. 如何解决? 暂且不论, 在后续flask的上下文源码中可以找到!!

image-20240924083251538


快速使用
flask的使用

← 快速使用 flask的使用→

最近更新
01
deepseek本地部署+知识库
02-17
02
实操-微信小程序
02-14
03
教学-cursor深度探讨
02-13
更多文章>
Theme by Vdoing | Copyright © 2023-2025 DC | One Piece
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式