SQLAlchemy

官网地址:https://docs.sqlalchemy.org/en/13/

安装

1
2
pip install SQLAlchemy
pip install mysqlclient
  • 版本检查
1
2
3
>>> import sqlalchemy
>>> sqlalchemy.__version__ 
1.3.0

创建核心引擎

create_engine()创建了一个针对某种数据库的核心接口,所有数据库的操作我们都将可以使用python语法去操作啦!

1
2
3
4
5
from sqlalchemy import create_engine
engine = create_engine("mysql://root:123456@localhost:3306/test?charset=utf8",echo=True)

# 参数1:连接数据库的url地址
# 参数2: echo 设置为True,则输出日志,否则不输出

声明映射

正如刚才我们所描述的,所有的数据库操作都可以使用python语法来调用啦! 接下来,我们若想创建一张表的话,我们只需要创建一个python类就好啦!

根据不同的数据库,我们需要使用不同的基类!

SQLAlchemy提供了这样一个方法帮我们干这个事情:declarative_base(),它维系着python类和数据库表之间的关系

具体调用语法如下:

1
2
3
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

现在有了这个基类之后,我们就可以开始来创建表啦! 假设我们想创建一个users表,我们只需要定义一个User类即可!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 创建映射
from sqlalchemy import Column,String,Integer,Numeric

class User(Base):
    # 指定表名
    __tablename__ = "users"

    # 指定列名
    id = Column(Integer,primary_key=True)
    # 指定名称
    name = Column(String(20),nullable=False)
    # 指定年龄
    age = Column(Integer,default=18)
    # 指定身高
    high = Column(Numeric(5,2), default=18)

    def __str__(self):
        return "User(id={},name={},age={},high={})".format(self.id,self.name,self.age,self.high)

# 创建一个实例   
user = User(name="张三",age=18,high=170.2)

注意:类至少需要一个__tablename__属性,一个主键行。

创建真实的表

1
2
3
4
# 若表不存在,则创建
Base.metadata.create_all(engine)
# 删除所有的表
Base.metadata.drop_all(engine)

创建会话

接下来,我们就需要考虑连接数据库,将user对象保存到数据库中去啦! 怎样去连接呢?😊还记得我们前面创建的engine吧!

我们只需要创建一个会话对象:session,将它与engine联系在一起就好啦! sqlalchemy给我们提供了一个专门用于创建会话的工厂类sessionmaker

1
2
3
4
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)

session = Session()

注意: 当前的Session中不会建立连接,只有当我们去使用session进行sql操作的时候,它才会去前面我们创建的engine的连接池中获取一个连接

添加记录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 创建一个会话
session = Session()
# 创建一个user对象
user = User(name="张三",age=18,high=170.2)
# 将user保存到数据库中
session.add(user)
# 提交
session.commit()


######批量添加记录######
users = [
    User(name="张三",age=23,high=181.3),
    User(name="李四",age=24,high=191.3),
    User(name="王五",age=25,high=171.3),
]
session.add_all(users)
session.commit()

修改记录

将第一条名为张三的姓名改成阿桑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
session = Session()
# 先将数据查询出来,filter_by其实相当于是编写where条件,
user = session.query(User).filter(User.name="张三").first()
user.name = "阿桑"

session.commit()
session.close()

## 修改满足条件的
 session.query(User).filter(User.id>2).update({User.name:"小四"})

删除记录

1
2
3
4
session.delete(user)

# 删除满足条件的
session.query(User).filter(User.id>2).delete()

查询记录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ==
session.query(User).filter(User.id==1)
# !=
session.query(User).filter(User.id!=1)
# like
session.query(User).filter(User.name.like("%张%"))
# in
session.query(User).filter(User.id.in_([1,2,3]))
# not in
session.query(User).filter(~User.id.in_([1,2,3]))
# is null
session.query(User).filter(User.age == None)
# is not null
session.query(User).filter(User.age != None)
# and
from sqlalchemy import and_
session.query(User).filter(and_(User.id==1,User.name=='张三'))
# or
from sqlalchemy import or_
session.query(User).filter(or_(User.id==1,User.name=='张三'))
# limit
session.query(User).offset(2).limit(3).all()

注意:上面语句仅仅只是构建了查询的条件,要想获取执行的结果还需要调用如下几个方法:

1
2
3
4
5
6
7
first() : 返回第一条记录
all() : 返回所有记录
one() : 返回一条记录,如果查出记录不是一条,则会抛出异常,没有查到也会抛出异常
one_or_none() : 和one().唯一的区别在于,只查询一条,没有就返回None,查出多条抛出异常
scalar(): 在聚合查询的时候,返回结果数值

get(主键): 直接根据主键去查询    
  • 聚合查询
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from sqlalchemy import func
def test_func():
    session = Session()

    result = session.query(func.count(User.id)).scalar()
    result = session.query(func.max(User.id)).scalar()
    result = session.query(func.min(User.id)).scalar()
    result = session.query(func.sum(User.id)).scalar()
    result = session.query(func.avg(User.id)).scalar()

    session.close()

sqlacodegen

官网地址:https://pypi.org/project/sqlacodegen/

安装

1
pip install sqlacodegen

查看命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
sqlacodegen --help

optional arguments:
  -h, --help         show this help message and exit
  --version          print the version number and exit
  --schema SCHEMA    load tables from an alternate schema
  --tables TABLES    tables to process (comma-separated, default: all)
  --noviews          ignore views
  --noindexes        ignore indexes
  --noconstraints    ignore constraints
  --nojoined         don't autodetect joined table inheritance
  --noinflect        don't try to convert tables names to singular form
  --noclasses        don't generate classes, only tables
  --outfile OUTFILE  file to write output to (default: stdout)

使用示例

1
2
3
4
5
6
7
sqlacodegen mysql://user:password@localhost/dbname
sqlacodegen sqlite:///database.db

sqlacodegen mysql://root:123456@localhost:3306/day25 --outfile "model.py"

sqlacodegen mysql://root:123456@localhost:3306/test --outfile "model.py"
sqlacodegen mysql://root:123456@localhost:3306/test --tables users --outfile "users.py"

查看直接生成的model文件

怎么样? 很神奇吧!😊感觉一下子就可以放下鼠标,喝半天咖啡等待下班啦!😂

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# coding: utf-8
from sqlalchemy import Column, DECIMAL, Date, Float, ForeignKey, String, TIMESTAMP, Table, text
from sqlalchemy.dialects.mysql import INTEGER
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
metadata = Base.metadata


class Category(Base):
    __tablename__ = 'category'

    cid = Column(INTEGER(11), primary_key=True)
    cname = Column(String(20))
    cdesc = Column(String(50))


class User(Base):
    __tablename__ = 'user'

    uid = Column(INTEGER(11), primary_key=True)
    name = Column(String(10))
    password = Column(String(20))
    mobile = Column(String(11))



class Order(Base):
    __tablename__ = 'orders'

    oid = Column(INTEGER(11), primary_key=True)
    total = Column(Float(asdecimal=True))
    otime = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
    uno = Column(ForeignKey('user.uid'), index=True)

    user = relationship('User')


class Product(Base):
    __tablename__ = 'product'

    pid = Column(INTEGER(11), primary_key=True)
    pname = Column(String(10))
    price = Column(Float(asdecimal=True))
    pdesc = Column(String(50))
    cno = Column(ForeignKey('category.cid'), index=True)

    category = relationship('Category')


t_orderitem = Table(
    'orderitem', metadata,
    Column('ono', ForeignKey('orders.oid'), index=True),
    Column('pno', ForeignKey('product.pid'), index=True),
    Column('num', INTEGER(11)),
    Column('subtotal', Float(asdecimal=True))
)

注意:** url的写法

1
2
3
4
5
6
7
8
9
engine = create_engine('sqlite:///foo.db')
# Unix/Mac - 4 initial slashes in total
engine = create_engine('sqlite:////absolute/path/to/foo.db')

# Windows
engine = create_engine('sqlite:///C:\\path\\to\\foo.db')

# Windows alternative using raw string
engine = create_engine(r'sqlite:///C:\path\to\foo.db')