使用SQLAlchemy的表达式语言进行高效Python代码查询

22,734次阅读
没有评论

共计 8757 个字符,预计需要花费 22 分钟才能阅读完成。

通过定义数据模型并使用 SQLAlchemy 的 ORM 中的会话对象,完全在 Python 中处理应用程序的数据。

到目前为止,我们的 SQLAlchemy 之旅已经涵盖了管理数据库连接和模型创建。然而,我们如何从数据库中提取我们想要的数据呢?

SQLAlchemy 的 ORM 查询 API 简化了我们编写数据库查询的方式。我们可以通过将检索数据的方法链接在一起来在 SQLAlchemy 会话上构造查询,而不是编写原始 SQL 查询。我们将深入研究 SQLAlchemy 的广泛查询 API,以了解查询数据的所有方式。

创建 a Session

我们在 上一篇文章中介绍了 SQLAlchemy 会话创建,并在之前的文章中解释了引擎的概念。如果您跳过了这些帖子,请不要跳过。以下是复制 + 面食礼貌:

"""数据库引擎和会话创建。"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


engine = create_engine(
    'mysql+pymysql://user:password@host:3600/database',
    echo=True
)
Session = sessionmaker(bind=engine)
session = Session()

基本查询语法

让我们快速熟悉 SQLAlchemy 查询 API 的基本结构。SQLAlchemy 会话对象有一个 query()方法,它接受我们之前定义的数据模型的原始类。Customer 下面是在模型上运行的查询的简单开始;或者换句话说,对客户 SQL 表进行查询:

从 SQLAlchemy 会话中选择记录:

"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer


# ORM 查询的示例结构
records = session
    .query(Customer)
    .FUNCTION()

在我们向链中添加另一种方法之前,调用.query(Customer)我们的会话并不是一个有效的查询。所有会话查询都以最终方法结束,以塑造 / 预测查询结果:

  • all()将返回与我们的查询匹配的所有记录作为对象列表。如果我们在上面的查询中使用 allList[Customer],我们将收到 Python 数据类型的所有客户记录。

  • first()返回与查询匹配的第一条记录,无论有多少记录与查询匹配(“第一条”的构成取决于表的排序方式)。这相当于添加 LIMIT 1 到 SQL 查询中。因此,要返回的 Python 类型将为 Customer.

  • one()对于我们正在执行的查询最多应存在一条记录的情况(考虑按主键查询)非常有用。在创建记录之前验证记录是否存在时,此语法特别有用。

  • scalar()如果存在则返回单个值,如果不存在值则返回 None,或者如果返回多条记录则引发异常。

  • get([VALUE(S)])搜索模型的主键以返回主键等于所提供的值的行。get()如果应搜索多个外键,也接受元组。最后,还 get()可以接受字典并返回列(字典键)与提供的值匹配的行。

要创建更复杂的查询,我们将通过原始查询上的链接方法添加到查询中:

复杂的 SELECT 查询:

"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer


# Example structure of an ORM query
records = session
    .query(Customer)
    .METHOD_1()
    .METHOD_2()
    .FUNCTION()

查询结果

如果我们执行返回多条记录的查询,我们需要循环它们才能查看结果:

获取所有客户记录并打印输出:

"""从 SQLAlchemy 会话构建数据库查询。"""
from .database import session
from .models import Customer


# 获取所有客户记录
records = session
    .query(Customer)
    .all()

# 循环记录
for record in records:
    print(record)

默认情况下,SQLAlchemy ORM 将返回类的实例,这意味着上面的内容将产生以下输出:

会话查询的输出:










如果您想获取字典,请使用内置__dict__方法:

以字典形式查看查询结果:

...

records = session
    .query(Customer)
    .all()

for record in records:
    pp.pprint(record.__dict__)

相反,这会返回每行的字典对象:

会话查询的输出作为字典:

{'_sa_instance_state': ,
    'email': 'kpaladini5i@senate.gov',
    'first_name': 'Kenna',
    'id': 199,
    'join_date': datetime.datetime(2019, 4, 19, 0, 0),
    'last_name': 'Paladini',
    'preferred_language': 'Bulgarian'}
{'_sa_instance_state': ,
    'email': 'rlebrun5j@narod.ru',
    'first_name': 'Rriocard',
    'id': 200,
    'join_date': datetime.datetime(2015, 6, 8, 0, 0),
    'last_name': 'Le Brun',
    'preferred_language': 'Khmer'},
...

当然,您也可以创建自己的对象来只接收您想要 / 需要的列:

反序列化查询结果:

...

# 获取所有客户
records = session.query(Customer).all()

# 循环记录
for record in records:
    recordObject = {
        'name': record.name,
        'position': record.position,
        'team_name': record.team.name,
        'team_city': record.team.city
    }
    print(recordObject)

这输出了一些更干净的东西:

查询反序列化的输出:

{   'email': 'kpaladini5i@senate.gov',
    'first_name': 'Kenna',
    'join_date': datetime.datetime(2019, 4, 19, 0, 0),
    'last_name': 'Paladini',
    'preferred_language': 'Bulgarian'}
{   'email': 'rlebrun5j@narod.ru',
    'first_name': 'Rriocard',
    'join_date': datetime.datetime(2015, 6, 8, 0, 0),
    'last_name': 'Le Brun',
    'preferred_language': 'Khmer'},
...

过滤结果

查询中最常用的方法可能就是 filter()方法。filter()相当于 SQL WHERE 子句,仅返回符合我们想要的条件的行:

选择所有名为 Carl 的客户:

...

# 获取“first_name”为“Carl”的记录
records = session
    .query(Customer)
    .filter(Customer)
    .first_name == 'Carl')
    .all()

filter_by()

我们可以使用该方法编写上述查询,filter_by()如下所示:

过滤器 filter_by:

...

# 获取“first_name”为“Carl”的记录
records = session
    .query(Customer)
    .filter_by(first_name="Carl")
    .all()

与 不同的是 filter(),filter_by()接受关键字参数(请注意此处语法的差异:filter()针对列对象检查条件,而 filter_by()查找与我们传递的参数匹配的列)。filter_by()只能搜索精确值,并作为简单过滤查询的一种简写。

like()

我们可以做的不仅仅是过滤简单的条件。SQLAlchemy 有一个 like()与 SQL 的工作方式等效的方法 LIKE:

选择名字以“J”开头的客户记录:

...

# 获取“first_name”以字母“J”开头的记录
records = session
    .query(Customer)
    .filter(Customer.first_name.like('J%'))
    .all()

正如预期的那样,这将为我们提供客户名字以 J 开头的所有行:

{   'email': 'jpugsley9@netvibes.com',
    'first_name': 'Jarid',
    'join_date': datetime.datetime(2017, 10, 11, 0, 0),
    'last_name': 'Pugsley',
    'preferred_language': 'Burmese'}
{   'email': 'jdymockek@is.gd',
    'first_name': 'Jeanna',
    'join_date': datetime.datetime(2017, 11, 13, 0, 0),
    'last_name': 'Dymocke',
    'preferred_language': 'Malayalam'}
...

高级查询方法

除此之外 filter(),还有一些我们应该熟悉的基本方法。其中每一个都对应于您可能熟悉的 SQL 关键字:

  • limit([INTEGER]):将行数限制为所提供的最大行数。

  • order_by([COLUMN]):按提供的列对结果进行排序。

  • offset([INTEGER]):从第 n 行开始查询。

下一部分涉及在模型之间执行 JOIN 查询,这要求我们首先定义模型上的关系。目前事情有点乱,因为我实际上要等到下一篇文章才会讨论这个问题。抱歉打扰了,我正在努力!

连接不同表中的记录并反序列化记录

执行 JOIN 和 UNION

我们之前已经接触过 JOIN,但我们即将将其提升一个档次。我们正在使用两种数据模型:一种用于客户,一种用于订单。每位顾客

连接不同表中的记录并反序列化记录:

...
import pprint
from .models import Order, Customer


pp = pprint.PrettyPrinter(indent=4)


# 对 JOINed 表执行 SELECT 查询
records = session
    .query(Customer)
    .join(Order, Order.customer_id == Customer.id)
    .all()


# 循环遍历结果
for record in records:
    record_object = {
        'first_name': record.first_name,
        'last_name': record.last_name,
        'email': record.email,
        'preferred_language': record.preferred_language,
        'join_date': record.join_date,
        'orders': []}
    for order in record.order:
        order = {
            'order_price': order.price,
            'currency':  order.currency,
            'purchase_date':  order.purchase_date,
            'product':  order.product
        }
        record_object['orders'].append(order)
        pp.pprint(record_object)

我们使用该方法执行 JOIN join()。我们传递的第一个参数是我们将在“右侧”加入的数据模型。然后,我们指定要“加入”的内容:订单模型的 customer_id 列和客户模型的 id 列。

我们的外循环为我们提供了每个客户,而我们的内循环将每个订单添加到适当的客户。查看示例记录:

{   'email': 'jtinline16@arizona.edu',
    'first_name': 'Jerry',
    'join_date': datetime.datetime(2016, 10, 27, 0, 0),
    'last_name': 'Tinline',
    'preferred_language': 'Icelandic',
    'orders': [{'currency': 'IDR',
                'order_price': 34.24,
                'product': 'Beer - Corona',
                'purchase_date': datetime.datetime(2019, 5, 5, 0, 0)},
               {'currency': 'GEL',
                'order_price': 25.75,
                'product': 'Creamers - 10%',
                'purchase_date': datetime.datetime(2019, 1, 27, 0, 0)}]}

我们的朋友杰瑞有两份订单:一份是科罗娜啤酒,另一份是奶精。开始吧,杰瑞。

外连接

除了简单的 JOIN 之外,我们还可以使用相同的语法执行外部 JOIN:

...
from .models import ExampleModel1, ExampleModel2


# 执行外部 JOIN
records = session
    .query(ExampleModel1)
    .outerjoin(ExampleModel2)
    .all()

Unions

我们还可以执行 UNION 和 UNION ALL:

...
from .models import ExampleModel1, ExampleModel2


# 执行 UNION
records = ExampleModel1.union(ExampleModel2)

要执行全部联合,只需替换 union()为 union_all()!

聚合函数和统计数据

与所有类似 SQL 的查询语言一样,我们也可以执行一些聚合统计。我们可以使用以下内容:

  • count([COLUMN]):计算列中的记录数。

  • count(distinct([COLUMN])):计算列中不同记录的数量。

  • sum([COLUMN]):将列中的数值相加。

以下是我们如何执行对列中的值进行计数的查询:

...
from sqlalchemy import func


# 计算具有“first_name”值的记录数
records = session
    .query(func.count(Customer.first_name))
    .all()

for record in records:
    print(record)

哪个输出:

(200,)

可以轻松修改此查询以仅计算不同值:

...
from sqlalchemy import func
from sqlalchemy import distinct


# 计算不同“first_name”值的数量
records = session
    .query(func.count(distinct(Customer.first_name)))
    .all()

for record in records:
    print(record)

使用 Group_by()

当然,我们 group_by()也可以在基于聚合的查询上使用该方法。group_by()其工作原理与我们对 SQL 和 Pandas 的期望类似:

“分组依据”聚合:

...

# 执行“GROUP BY”聚合查询
records = session
    .query(func.count(Customer.first_name))
    .group_by(Customer.first_name)
    .all()

突变(Mutations)

我们花了很多时间研究如何从数据库中提取数据,但还没有讨论修改数据!今天我们议程上的最后一项是研究如何使用 SQLAlchemy ORM 添加、删除和更改记录。

插入行

我们添加数据的第一种方法是使用该 add()方法。add()期望传递一个类的实例(特别是数据模型),并将创建一个新的数据库行作为结果:

通过 ORM 插入记录:

from .database import session
from .models import Customer


# Inserting records via data models
customer = Customer(
    first_name='Todd',
    last_name='Birchard',
    email='fake@example.com',
    preferred_language='English',
    join_date=datetime.now())
session.add(customer)
session.commit()

添加数据的另一种方法是使用该 insert()方法。与 不同的是 add(),在 SQLAlchemy Tableinsert()对象上调用,并且不依赖于接收数据模型。不是 ORM 的一部分:insert()

...

# 通过 SQLAlchemy `Table` 对象插入记录
insert = [TABLE]
    .insert()
    .values(
        first_name='Todd',
        last_name='Jack Jones',
        email='fake@example.com',
        preferred_language='English',
        join_date=datetime.now())

更新中 Updating

基于 的语法 insert(),我们可以添加该 update()方法来更改现有记录的值。我们链接该 where()方法来指定应更新哪些行:

...

# 通过 SQLAlchemy `Table` 对象更新记录
result = [TABLE]
    .update()
    .where([TABLE].c.name == 'Todd')
    .values(email='newemail@example.com')

正在删除 Deleting

在我们执行的任何查询中,我们可以附加 delete()方法来删除该查询中包含的所有行(小心!)。下面删除 first_name 列包含值“Carl”的所有记录:

...

# 删除“first_name”为“Carl”的记录
result = session
    .query(Customer)
    .filter(Customer.first_name == 'Carl')
    .delete()

delete()接受 synchronize_session 参数,该参数确定应如何处理删除:

  • False 在提交会话之前不会执行删除。

  • ‘fetch’ 选择要删除的所有行并删除匹配的行。

  • ‘evaluate’ 将评估当前会话中的对象以确定应删除哪些行。

关键词:SQLAlchemy,表达式语言,Python 代码查询,数据库查询,会话对象 文章来源地址 https://www.toymoban.com/diary/sql/580.html

到此这篇关于使用 SQLAlchemy 的表达式语言进行高效 Python 代码查询的文章就介绍到这了, 更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持 TOY 模板网!

    正文完
     0
    Yojack
    版权声明:本篇文章由 Yojack 于1970-01-01发表,共计8757字。
    转载说明:
    1 本网站名称:优杰开发笔记
    2 本站永久网址:https://yojack.cn
    3 本网站的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,请联系站长进行删除处理。
    4 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
    5 本站所有内容均可转载及分享, 但请注明出处
    6 我们始终尊重原创作者的版权,所有文章在发布时,均尽可能注明出处与作者。
    7 站长邮箱:laylwenl@gmail.com
    评论(没有评论)