共计 8757 个字符,预计需要花费 22 分钟才能阅读完成。
通过定义数据模型并使用 SQLAlchemy 的 ORM 中的会话对象,完全在 Python 中处理应用程序的数据。
到目前为止,我们的 SQLAlchemy 之旅已经涵盖了管理数据库连接和模型创建。然而,我们如何从数据库中提取我们想要的数据呢?
SQLAlchemy 的 ORM 查询 API 简化了我们编写数据库查询的方式。我们可以通过将检索数据的方法链接在一起来在 SQLAlchemy 会话上构造查询,而不是编写原始 SQL 查询。我们将深入研究 SQLAlchemy 的广泛查询 API,以了解查询数据的所有方式。
创建 a Session
我们在 上一篇文章中介绍了 SQLAlchemy 会话创建,并在之前的文章中解释了引擎的概念。如果您跳过了这些帖子,请不要跳过。以下是复制 + 面食礼貌:
"""数据库引擎和会话创建。"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
'mysql+pymysql://user:password@host:3600/database',
echo=True
)
Session = sessionmaker(bind=engine)
session = Session()
基本查询语法
让我们快速熟悉 SQLAlchemy 查询 API 的基本结构。SQLAlchemy 会话对象有一个 query()方法,它接受我们之前定义的数据模型的原始类。Customer 下面是在模型上运行的查询的简单开始;或者换句话说,对客户 SQL 表进行查询:
从 SQLAlchemy 会话中选择记录:
"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer
# ORM 查询的示例结构
records = session
.query(Customer)
.FUNCTION()
在我们向链中添加另一种方法之前,调用.query(Customer)我们的会话并不是一个有效的查询。所有会话查询都以最终方法结束,以塑造 / 预测查询结果:
-
all()将返回与我们的查询匹配的所有记录作为对象列表。如果我们在上面的查询中使用 allList[Customer],我们将收到 Python 数据类型的所有客户记录。
-
first()返回与查询匹配的第一条记录,无论有多少记录与查询匹配(“第一条”的构成取决于表的排序方式)。这相当于添加 LIMIT 1 到 SQL 查询中。因此,要返回的 Python 类型将为 Customer.
-
one()对于我们正在执行的查询最多应存在一条记录的情况(考虑按主键查询)非常有用。在创建记录之前验证记录是否存在时,此语法特别有用。
-
scalar()如果存在则返回单个值,如果不存在值则返回 None,或者如果返回多条记录则引发异常。
-
get([VALUE(S)])搜索模型的主键以返回主键等于所提供的值的行。get()如果应搜索多个外键,也接受元组。最后,还 get()可以接受字典并返回列(字典键)与提供的值匹配的行。
要创建更复杂的查询,我们将通过原始查询上的链接方法添加到查询中:
复杂的 SELECT 查询:
"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer
# Example structure of an ORM query
records = session
.query(Customer)
.METHOD_1()
.METHOD_2()
.FUNCTION()
查询结果
如果我们执行返回多条记录的查询,我们需要循环它们才能查看结果:
获取所有客户记录并打印输出:
"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer
# 获取所有客户记录
records = session
.query(Customer)
.all()
# 循环记录
for record in records:
print(record)
默认情况下,SQLAlchemy ORM 将返回类的实例,这意味着上面的内容将产生以下输出:
会话查询的输出:
如果您想获取字典,请使用内置__dict__方法:
以字典形式查看查询结果:
...
records = session
.query(Customer)
.all()
for record in records:
pp.pprint(record.__dict__)
相反,这会返回每行的字典对象:
会话查询的输出作为字典:
{'_sa_instance_state': ,
'email': 'kpaladini5i@senate.gov',
'first_name': 'Kenna',
'id': 199,
'join_date': datetime.datetime(2019, 4, 19, 0, 0),
'last_name': 'Paladini',
'preferred_language': 'Bulgarian'}
{'_sa_instance_state': ,
'email': 'rlebrun5j@narod.ru',
'first_name': 'Rriocard',
'id': 200,
'join_date': datetime.datetime(2015, 6, 8, 0, 0),
'last_name': 'Le Brun',
'preferred_language': 'Khmer'},
...
当然,您也可以创建自己的对象来只接收您想要 / 需要的列:
反序列化查询结果:
...
# 获取所有客户
records = session.query(Customer).all()
# 循环记录
for record in records:
recordObject = {
'name': record.name,
'position': record.position,
'team_name': record.team.name,
'team_city': record.team.city
}
print(recordObject)
这输出了一些更干净的东西:
查询反序列化的输出:
{ 'email': 'kpaladini5i@senate.gov',
'first_name': 'Kenna',
'join_date': datetime.datetime(2019, 4, 19, 0, 0),
'last_name': 'Paladini',
'preferred_language': 'Bulgarian'}
{ 'email': 'rlebrun5j@narod.ru',
'first_name': 'Rriocard',
'join_date': datetime.datetime(2015, 6, 8, 0, 0),
'last_name': 'Le Brun',
'preferred_language': 'Khmer'},
...
过滤结果
查询中最常用的方法可能就是 filter()方法。filter()相当于 SQL WHERE 子句,仅返回符合我们想要的条件的行:
选择所有名为 Carl 的客户:
...
# 获取“first_name”为“Carl”的记录
records = session
.query(Customer)
.filter(Customer)
.first_name == 'Carl')
.all()
filter_by()
我们可以使用该方法编写上述查询,filter_by()如下所示:
过滤器 filter_by:
...
# 获取“first_name”为“Carl”的记录
records = session
.query(Customer)
.filter_by(first_name="Carl")
.all()
与 不同的是 filter(),filter_by()接受关键字参数(请注意此处语法的差异:filter()针对列对象检查条件,而 filter_by()查找与我们传递的参数匹配的列)。filter_by()只能搜索精确值,并作为简单过滤查询的一种简写。
like()
我们可以做的不仅仅是过滤简单的条件。SQLAlchemy 有一个 like()与 SQL 的工作方式等效的方法 LIKE:
选择名字以“J”开头的客户记录:
...
# 获取“first_name”以字母“J”开头的记录
records = session
.query(Customer)
.filter(Customer.first_name.like('J%'))
.all()
正如预期的那样,这将为我们提供客户名字以 J 开头的所有行:
{ 'email': 'jpugsley9@netvibes.com',
'first_name': 'Jarid',
'join_date': datetime.datetime(2017, 10, 11, 0, 0),
'last_name': 'Pugsley',
'preferred_language': 'Burmese'}
{ 'email': 'jdymockek@is.gd',
'first_name': 'Jeanna',
'join_date': datetime.datetime(2017, 11, 13, 0, 0),
'last_name': 'Dymocke',
'preferred_language': 'Malayalam'}
...
高级查询方法
除此之外 filter(),还有一些我们应该熟悉的基本方法。其中每一个都对应于您可能熟悉的 SQL 关键字:
-
limit([INTEGER]):将行数限制为所提供的最大行数。
-
order_by([COLUMN]):按提供的列对结果进行排序。
-
offset([INTEGER]):从第 n 行开始查询。
下一部分涉及在模型之间执行 JOIN 查询,这要求我们首先定义模型上的关系。目前事情有点乱,因为我实际上要等到下一篇文章才会讨论这个问题。抱歉打扰了,我正在努力!
连接不同表中的记录并反序列化记录
执行 JOIN 和 UNION
我们之前已经接触过 JOIN,但我们即将将其提升一个档次。我们正在使用两种数据模型:一种用于客户,一种用于订单。每位顾客
连接不同表中的记录并反序列化记录:
...
import pprint
from .models import Order, Customer
pp = pprint.PrettyPrinter(indent=4)
# 对 JOINed 表执行 SELECT 查询
records = session
.query(Customer)
.join(Order, Order.customer_id == Customer.id)
.all()
# 循环遍历结果
for record in records:
record_object = {
'first_name': record.first_name,
'last_name': record.last_name,
'email': record.email,
'preferred_language': record.preferred_language,
'join_date': record.join_date,
'orders': []}
for order in record.order:
order = {
'order_price': order.price,
'currency': order.currency,
'purchase_date': order.purchase_date,
'product': order.product
}
record_object['orders'].append(order)
pp.pprint(record_object)
我们使用该方法执行 JOIN join()。我们传递的第一个参数是我们将在“右侧”加入的数据模型。然后,我们指定要“加入”的内容:订单模型的 customer_id 列和客户模型的 id 列。
我们的外循环为我们提供了每个客户,而我们的内循环将每个订单添加到适当的客户。查看示例记录:
{ 'email': 'jtinline16@arizona.edu',
'first_name': 'Jerry',
'join_date': datetime.datetime(2016, 10, 27, 0, 0),
'last_name': 'Tinline',
'preferred_language': 'Icelandic',
'orders': [{'currency': 'IDR',
'order_price': 34.24,
'product': 'Beer - Corona',
'purchase_date': datetime.datetime(2019, 5, 5, 0, 0)},
{'currency': 'GEL',
'order_price': 25.75,
'product': 'Creamers - 10%',
'purchase_date': datetime.datetime(2019, 1, 27, 0, 0)}]}
我们的朋友杰瑞有两份订单:一份是科罗娜啤酒,另一份是奶精。开始吧,杰瑞。
外连接
除了简单的 JOIN 之外,我们还可以使用相同的语法执行外部 JOIN:
...
from .models import ExampleModel1, ExampleModel2
# 执行外部 JOIN
records = session
.query(ExampleModel1)
.outerjoin(ExampleModel2)
.all()
Unions
我们还可以执行 UNION 和 UNION ALL:
...
from .models import ExampleModel1, ExampleModel2
# 执行 UNION
records = ExampleModel1.union(ExampleModel2)
要执行全部联合,只需替换 union()为 union_all()!
聚合函数和统计数据
与所有类似 SQL 的查询语言一样,我们也可以执行一些聚合统计。我们可以使用以下内容:
-
count([COLUMN]):计算列中的记录数。
-
count(distinct([COLUMN])):计算列中不同记录的数量。
-
sum([COLUMN]):将列中的数值相加。
以下是我们如何执行对列中的值进行计数的查询:
...
from sqlalchemy import func
# 计算具有“first_name”值的记录数
records = session
.query(func.count(Customer.first_name))
.all()
for record in records:
print(record)
哪个输出:
(200,)
可以轻松修改此查询以仅计算不同值:
...
from sqlalchemy import func
from sqlalchemy import distinct
# 计算不同“first_name”值的数量
records = session
.query(func.count(distinct(Customer.first_name)))
.all()
for record in records:
print(record)
使用 Group_by()
当然,我们 group_by()也可以在基于聚合的查询上使用该方法。group_by()其工作原理与我们对 SQL 和 Pandas 的期望类似:
“分组依据”聚合:
...
# 执行“GROUP BY”聚合查询
records = session
.query(func.count(Customer.first_name))
.group_by(Customer.first_name)
.all()
突变(Mutations)
我们花了很多时间研究如何从数据库中提取数据,但还没有讨论修改数据!今天我们议程上的最后一项是研究如何使用 SQLAlchemy ORM 添加、删除和更改记录。
插入行
我们添加数据的第一种方法是使用该 add()方法。add()期望传递一个类的实例(特别是数据模型),并将创建一个新的数据库行作为结果:
通过 ORM 插入记录:
from .database import session
from .models import Customer
# Inserting records via data models
customer = Customer(
first_name='Todd',
last_name='Birchard',
email='fake@example.com',
preferred_language='English',
join_date=datetime.now())
session.add(customer)
session.commit()
添加数据的另一种方法是使用该 insert()方法。与 不同的是 add(),在 SQLAlchemy Tableinsert()对象上调用,并且不依赖于接收数据模型。不是 ORM 的一部分:insert()
...
# 通过 SQLAlchemy `Table` 对象插入记录
insert = [TABLE]
.insert()
.values(
first_name='Todd',
last_name='Jack Jones',
email='fake@example.com',
preferred_language='English',
join_date=datetime.now())
更新中 Updating
基于 的语法 insert(),我们可以添加该 update()方法来更改现有记录的值。我们链接该 where()方法来指定应更新哪些行:
...
# 通过 SQLAlchemy `Table` 对象更新记录
result = [TABLE]
.update()
.where([TABLE].c.name == 'Todd')
.values(email='newemail@example.com')
正在删除 Deleting
在我们执行的任何查询中,我们可以附加 delete()方法来删除该查询中包含的所有行(小心!)。下面删除 first_name 列包含值“Carl”的所有记录:
...
# 删除“first_name”为“Carl”的记录
result = session
.query(Customer)
.filter(Customer.first_name == 'Carl')
.delete()
delete()接受 synchronize_session 参数,该参数确定应如何处理删除:
-
False 在提交会话之前不会执行删除。
-
‘fetch’ 选择要删除的所有行并删除匹配的行。
-
‘evaluate’ 将评估当前会话中的对象以确定应删除哪些行。文章来源:https://www.toymoban.com/diary/sql/580.html
关键词:SQLAlchemy,表达式语言,Python 代码查询,数据库查询,会话对象 文章来源地址 https://www.toymoban.com/diary/sql/580.html
到此这篇关于使用 SQLAlchemy 的表达式语言进行高效 Python 代码查询的文章就介绍到这了, 更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持 TOY 模板网!