概述
sqlalchemy
One of the key aspects of any data science workflow is the sourcing, cleaning, and storing of raw data in a form that can be used upstream. This process is commonly referred to as “Extract-Transform-Load,” or ETL for short.
任何数据科学工作流程的关键方面之一就是以可以在上游使用的形式采购,清理和存储原始数据。 此过程通常称为“提取-转换-加载”或简称ETL。
It is important to design efficient, robust, and reliable ETL processes, or “data pipelines.” An inefficient pipeline will make working with data slow and unproductive. A non-robust pipeline will break easily, leaving gaps.
设计有效,健壮和可靠的ETL流程或“数据管道”非常重要。 效率低下的管道会使处理数据的速度变慢且效率低下。 非健壮的管道将很容易中断,并留下空白。
Worse still, an unreliable data pipeline will silently contaminate your database with false data that may not become apparent until damage has been done.
更糟糕的是,不可靠的数据管道将以错误的数据静默污染数据库,这些错误的数据在损坏完成之前可能不会变得明显。
Although critically important, ETL development can be a slow and cumbersome process at times. Luckily, there are open source solutions that make life much easier.
尽管非常重要,但ETL开发有时可能是一个缓慢而繁琐的过程。 幸运的是,有开源解决方案可以使生活更加轻松。
什么是SQLAlchemy? (What is SQLAlchemy?)
One such solution is a Python module called SQLAlchemy. It allows data engineers and developers to define schemas, write queries, and manipulate SQL databases entirely through Python.
一种这样的解决方案是称为SQLAlchemy的Python模块。 它允许数据工程师和开发人员完全通过Python定义架构,编写查询和操作SQL数据库。
SQLAlchemy’s Object Relational Mapper (ORM) and Expression Language functionalities iron out some of the idiosyncrasies apparent between different implementations of SQL by allowing you to associate Python classes and constructs with data tables and expressions.
SQLAlchemy的对象关系映射器(ORM)和表达式语言功能允许您将Python类和构造与数据表和表达式相关联,从而消除了不同SQL实现之间的某些特质。
Here, we’ll run through some highlights of SQLAlchemy to discover what it can do and how it can make ETL development a smoother process.
在这里,我们将遍历SQLAlchemy的一些要点,以发现它可以做什么以及如何使ETL开发更顺畅。
配置 (Setting up)
You can install SQLAlchemy using the pip package installer.
您可以使用pip软件包安装程序安装SQLAlchemy。
$ sudo pip install sqlalchemy
As for SQL itself, there are many different versions available, including MySQL, Postgres, Oracle, and Microsoft SQL Server. For this article, we’ll be using SQLite.
至于SQL本身,有许多可用的版本,包括MySQL,Postgres,Oracle和Microsoft SQL Server。 对于本文,我们将使用SQLite。
SQLite is an open-source implementation of SQL that usually comes pre-installed with Linux and Mac OS X. It is also available for Windows. If you don’t have it on your system already, you can follow these instructions to get up and running.
SQLite是SQL的开源实现,通常预装在Linux和Mac OS X中。它也可用于Windows。 如果您的系统上尚未安装它,则可以按照以下说明进行操作 。
In a new directory, use the terminal to create a new database:
在新目录中,使用终端创建新数据库:
$ mkdir sqlalchemy-demo && cd sqlalchemy-demo
$ touch demo.db
定义架构 (Defining a schema)
A database schema defines the structure of a database system, in terms of tables, columns, fields, and the relationships between them. Schemas can be defined in raw SQL, or through the use of SQLAlchemy’s ORM feature.
数据库模式根据表,列,字段以及它们之间的关系来定义数据库系统的结构。 模式可以在原始SQL中定义,也可以通过使用SQLAlchemy的ORM功能进行定义。
Below is an example showing how to define a schema of two tables for an imaginary blogging platform. One is a table of users, and the other is a table of posts uploaded.
下面的示例显示了如何为虚构的博客平台定义两个表的架构。 一个是用户表,另一个是上载的帖子表。
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *
engine = create_engine('sqlite:///demo.db')
Base = declarative_base()
class Users(Base):
__tablename__ = "users"
UserId = Column(Integer, primary_key=True)
Title = Column(String)
FirstName = Column(String)
LastName = Column(String)
Email = Column(String)
Username = Column(String)
DOB = Column(DateTime)
class Uploads(Base):
__tablename__ = "uploads"
UploadId = Column(Integer, primary_key=True)
UserId = Column(Integer)
Title = Column(String)
Body = Column(String)
Timestamp = Column(DateTime)
Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)
First, import everything you need from SQLAlchemy. Then, use create_engine(connection_string)
to connect to your database. The exact connection string will depend on the version of SQL you are working with. This example uses a relative path to the SQLite database created earlier.
首先,从SQLAlchemy导入所需的一切。 然后,使用create_engine(connection_string)
连接到数据库。 确切的连接字符串将取决于您使用SQL版本。 本示例使用到先前创建SQLite数据库的相对路径。
Next, start defining your table classes. The first one in the example is Users
. Each column in this table is defined as a class variable using SQLAlchemy’s Column(type)
, where type
is a data type (such as Integer
, String
, DateTime
and so on). Use primary_key=True
to denote columns which will be used as primary keys.
接下来,开始定义表类。 示例中的第一个是Users
。 该表中的每一列都使用SQLAlchemy的Column(type)
定义为类变量,其中type
是数据类型(例如Integer
, String
, DateTime
等)。 使用primary_key=True
表示将用作主键的列。
The next table defined here is Uploads
. It’s very much the same idea — each column is defined as before.
此处定义的下一个表格是Uploads
。 这几乎是相同的想法-每列的定义都像以前一样。
The final two lines actually create the tables. The checkfirst=True
parameter ensures that new tables are only created if they do not currently exist in the database.
最后两行实际上创建了表。 checkfirst=True
参数可确保仅在数据库中当前不存在新表时才创建它们。
提取 (Extract)
Once the schema has been defined, the next task is to extract the raw data from its source. The exact details can vary wildly from case to case, depending on how the raw data is provided. Maybe your app calls an in-house or third-party API, or perhaps you need to read data logged in a CSV file.
定义架构后,下一个任务是从其源中提取原始数据。 具体情况可能因情况而异,具体取决于提供原始数据的方式。 也许您的应用调用了内部API或第三方API,或者您可能需要读取记录在CSV文件中的数据。
The example below uses two APIs to simulate data for the fictional blogging platform described above. The Users
table will be populated with profiles randomly generated at randomuser.me, and the Uploads
table will contain lorem ipsum-inspired data courtesy of JSONPlaceholder.
下面的示例使用两个API为上述虚构的博客平台模拟数据。 将在Users
表中填充在randomuser.me处随机生成的配置文件,并且Uploads
表将包含受lorem ipsum启发的JSONPlaceholder数据。
Python’s Requests
module can be used to call these APIs, as shown below:
可以使用Python的Requests
模块来调用这些API,如下所示:
import requests
url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()
The data is currently held in two objects (users_json
and uploads_json
) in JSON format. The next step will be to transform and load this data into the tables defined earlier.
数据当前以JSON格式保存在两个对象( users_json
和uploads_json
)中。 下一步将转换此数据并将其加载到之前定义的表中。
转变 (Transform)
Before the data can be loaded into the database, it is important to ensure that it is in the correct format. The JSON objects created in the code above are nested, and contain more data than is required for the tables defined.
在将数据加载到数据库之前,重要的是要确保其格式正确。 在上面的代码中创建的JSON对象是嵌套的,并且包含的数据量超过定义的表所需的数据量。
An important intermediary step is to transform the data from its current nested JSON format to a flat format that can be safely written to the database without error.
一个重要的中间步骤是从其当前嵌套JSON格式变换的数据可以被安全地写入到数据库而不会出现错误的平坦格式。
For the example running through this article, the data are relatively simple, and won’t need much transformation. The code below creates two lists, users
and uploads
, which will be used in the final step:
对于本文中运行的示例,数据相对简单,不需要太多转换。 下面的代码创建两个列表, users
和uploads
,将在最后一步中使用它们:
from datetime import datetime, timedelta
from random import randint
users, uploads = [], []
for i, result in enumerate(users_json['results']):
row = {}
row['UserId'] = i
row['Title'] = result['name']['title']
row['FirstName'] = result['name']['first']
row['LastName'] = result['name']['last']
row['Email'] = result['email']
row['Username'] = result['login']['username']
dob = datetime.strptime(result['dob'],'%Y-%m-%d %H:%M:%S')
row['DOB'] = dob.date()
users.append(row)
for result in uploads_json:
row = {}
row['UploadId'] = result['id']
row['UserId'] = result['userId']
row['Title'] = result['title']
row['Body'] = result['body']
delta = timedelta(seconds=randint(1,86400))
row['Timestamp'] = datetime.now() - delta
uploads.append(row)
The main step here is to iterate through the JSON objects created before. For each result, create a new Python dictionary object with keys corresponding to each column defined for the relevant table in the schema. This ensures that the data is no longer nested, and keeps only the data needed for the tables.
这里的主要步骤是遍历之前创建的JSON对象。 对于每个结果,创建一个新的Python字典对象,其键对应于为架构中相关表定义的每一列。 这样可以确保不再嵌套数据,并仅保留表所需的数据。
The other step is to use Python’s datetime
module to manipulate dates, and transform them into DateTime
type objects that can be written to the database. For the sake of this example, random DateTime
objects are generated using the timedelta()
method from Python’s DateTime module.
另一个步骤是使用Python的datetime
模块来处理日期,并将其转换为可以写入数据库的DateTime
类型的对象。 对于本示例,将使用Python的DateTime模块中的timedelta()
方法生成随机的DateTime
对象。
Each created dictionary is appended to a list, which will be used in the final step of the pipeline.
每个创建的字典都会附加到列表中,该列表将在管道的最后一步中使用。
加载 (Load)
Finally, the data is in a form that can be loaded into the database. SQLAlchemy makes this step straightforward through its Session API.
最后,数据采用可以加载到数据库中的形式。 SQLAlchemy通过其Session API使这一步骤变得简单。
The Session API acts a bit like a middleman, or “holding zone,” for Python objects you have either loaded from or associated with the database. These objects can be manipulated within the session before being committed to the database.
对于从数据库加载或与数据库关联的Python对象,Session API的行为有点像中间人或“保留区”。 这些对象可以在提交给数据库之前在会话中进行操作。
The code below creates a new session object, adds rows to it, then merges and commits them to the database:
下面的代码创建一个新的会话对象,向其中添加行,然后合并并将它们提交到数据库:
Session = sessionmaker(bind=engine)
session = Session()
for user in users:
row = Users(**user)
session.add(row)
for upload in uploads:
row = Uploads(**upload)
session.add(row)
session.commit()
The sessionmaker
factory is used to generate newly-configured Session
classes. Session
is an everyday Python class that is instantiated on the second line as session
.
sessionmaker
工厂用于生成新配置的Session
类。 Session
是日常的Python类,在第二行将其实例化为session
。
Next up are two loops which iterate through the users
and uploads
lists created earlier. The elements of these lists are dictionary objects whose keys correspond to the columns given in the Users
and Uploads
classes defined previously.
接下来是两个循环,这些循环遍历users
并uploads
先前创建的列表。 这些列表的元素是字典对象,其关键字对应于先前定义的“ Users
和“上Uploads
类中给定的列。
Each object is used to instantiate a new instance of the relevant class (using Python’s handy some_function(**some_dict)
trick). This object is added to the current session with session.add()
.
每个对象用于实例化相关类的新实例(使用Python方便的some_function(**some_dict)
技巧)。 该对象通过session.add()
添加到当前会话中。
Finally, when the session contains the rows to be added, session.commit()
is used to commit the transaction to the database.
最后,当会话包含要添加的行时, session.commit()
用于将事务提交到数据库。
汇总 (Aggregating)
Another cool feature of SQLAlchemy is the ability to use its Expression Language system to write and execute backend-agnostic SQL queries.
SQLAlchemy的另一个很酷的功能是能够使用其Expression Language系统编写和执行与后端无关SQL查询。
What are the advantages of writing backend-agnostic queries? For a start, they make any future migration projects a whole lot easier. Different versions of SQL have somewhat incompatible syntaxes, but SQLAlchemy’s Expression Language acts as a lingua franca between them.
编写与后端无关的查询有什么优势? 首先,它们使将来的任何迁移项目变得更加容易。 不同版本SQL语法有些不兼容,但是SQLAlchemy的Expression Language充当它们之间的通用语言。
Also, being able to query and interact with your database in a seamlessly Pythonic way is a real advantage to developers who’d prefer work entirely in the language they know best. However, SQLAlchemy will also let you work in plain SQL, for cases when it is simpler to use a pre-written query.
此外,对于希望完全使用他们最了解的语言工作的开发人员而言,能够以无缝的Python语言方式查询数据库并与之交互是一个真正的优势。 但是,在使用预先编写的查询更简单的情况下,SQLAlchemy还可以让您使用纯SQL。
Here, we will extend the fictional blogging platform example to illustrate how this works. Once the basic Users and Uploads tables have been created and populated, a next step might be to create an aggregated table — for instance, showing how many articles each user has posted, and the time they were last active.
在这里,我们将扩展虚构的博客平台示例,以说明其工作原理。 创建并填充基本的“用户和上载”表后,下一步可能是创建汇总表-例如,显示每个用户发布了多少文章以及他们上次活动的时间。
First, define a class for the aggregated table:
首先,为汇总表定义一个类:
class UploadCounts(Base):
__tablename__ = "upload_counts"
UserId = Column(Integer, primary_key=True)
LastActive = Column(DateTime)
PostCount = Column(Integer)
UploadCounts.__table__.create(bind=engine, checkfirst=True)
This table will have three columns. For each UserId
, it will store the timestamp of when they were last active, and a count of how many posts they have uploaded.
该表将包含三列。 对于每个UserId
,它将存储上次激活时间的时间戳以及上载多少帖子的计数。
In plain SQL, this table would be populated using a query along the lines of:
在普通SQL中,将使用查询填充该表,包括:
INSERT INTO upload_counts
SELECT
UserId,
MAX(Timestamp) AS LastActive,
COUNT(UploadId) AS PostCount
FROM
uploads
GROUP BY 1;
In SQLAlchemy, this would be written as:
在SQLAlchemy中,这将写为:
connection = engine.connect()
query = select([Uploads.UserId,
func.max(Uploads.Timestamp).label('LastActive'),
func.count(Uploads.UploadId).label('PostCount')]).
group_by('UserId')
results = connection.execute(query)
for result in results:
row = UploadCounts(**result)
session.add(row)
session.commit()
The first line creates a Connection
object using the engine
object’s connect()
method. Next, a query is defined using the select()
function.
第一行使用engine
对象的connect()
方法创建一个Connection
对象。 接下来,使用select()
函数定义查询。
This query is the same as the plain SQL version given above. It selects the UserId
column from the uploads
table. It also applies func.max()
to the Timestamp
column, which identifies the most recent timestamp. This is labelled LastActive
using the label()
method.
此查询与上面给出的普通SQL版本相同。 它从uploads
表中选择UserId
列。 还将func.max()
应用于“ Timestamp
列,该列标识最近的时间戳。 使用label()
方法将其label()
为LastActive
。
Likewise, the query applies func.count()
to count the number of records that appear in the Title
column. This is labelled PostCount
.
同样,该查询将应用func.count()
来计算出现在“ Title
列中的记录数。 这被标记为PostCount
。
Finally, the query uses group_by()
to group results by UserId
.
最后,查询使用group_by()
将结果按UserId
分组。
To use the results of the query, a for loop iterates over the row objects returned by connection.execute(query)
. Each row is used to instantiate an instance of the UploadCounts
table class. As before, each row is added to the session
object, and finally the session is committed to the database.
要使用查询结果,for循环遍历由connection.execute(query)
返回的行对象。 每行用于实例化UploadCounts
表类的实例。 和以前一样,将每一行添加到session
对象,最后将会话提交到数据库。
签出 (Checking out)
Once you have run this script, you may want to convince yourself that the data have been written correctly into the demo.db
database created earlier.
一旦运行了该脚本,您可能想使自己确信数据已正确写入到先前创建的demo.db
数据库中。
After quitting Python, open the database in SQLite:
退出Python后,在SQLite中打开数据库:
$ sqlite3 demo.db
Now, you should be able to run the following queries:
现在,您应该能够运行以下查询:
SELECT * FROM users;
SELECT * FROM uploads;
SELECT * FROM upload_counts;
And the contents of each table will be printed to the console! By scheduling the Python script to run at regular intervals, you can be sure the database will be kept up-to-date.
每个表的内容将被打印到控制台! 通过安排Python脚本定期运行,可以确保数据库保持最新状态。
You could now use these tables to write queries for further analysis, or to build dashboards for visualisation purposes.
现在,您可以使用这些表编写查询以进行进一步分析,或者构建仪表板以进行可视化。
进一步阅读 (Reading further)
If you’ve made it this far, then hopefully you’ll have learned a thing or two about how SQLAlchemy can make ETL development in Python much more straightforward!
如果到此为止,那么希望您能学到一两个关于SQLAlchemy如何使Python中的ETL开发更加简单的事情!
It is not possible for a single article to do full justice to all the features of SQLAlchemy. However, one of the project’s key advantages is the depth and detail of its documentation. You can dive into it here.
单篇文章不可能对SQLAlchemy的所有功能都做到十全十美。 但是,该项目的主要优势之一是其文档的深度和细节。 你可以在这里潜入。
Otherwise, check out this cheatsheet if you want to get started quickly.
否则,如果您想快速入门,请查看此备忘单 。
The full code for this article can be found in this gist.
本文的完整代码可在本要点中找到。
Thanks for reading! If you have any questions or comments, please leave a response below.
谢谢阅读! 如果您有任何疑问或意见,请在下面留下答复。
翻译自: https://www.freecodecamp.org/news/sqlalchemy-makes-etl-magically-easy-ab2bd0df928/
sqlalchemy
最后
以上就是饱满短靴为你收集整理的sqlalchemy_SQLAlchemy使ETL变得异常简单的全部内容,希望文章能够帮你解决sqlalchemy_SQLAlchemy使ETL变得异常简单所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复