概述
SQLAlchemy
python链接
pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')" ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1" ) result = engine.execute('select * from ts_test') result.fetchall()
事务操作
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) # 事务操作 with engine.begin() as conn: conn.execute("insert into table (x, y, z) values (1, 2, 3)") conn.execute("my_special_procedure(5)") conn = engine.connect() # 事务操作 with conn.begin(): conn.execute("some statement", {'x':5, 'y':10}) 事务操作
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) metadata.create_all(engine) # metadata.clear() # metadata.remove()
增删改查
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close() 增删改查
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息 # Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # ########## 增 ########## # u = User(id=2, name='sb') # session.add(u) # session.add_all([ # User(id=3, name='sb'), # User(id=4, name='sb') # ]) # session.commit() # ########## 删除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit() # ########## 修改 ########## # session.query(User).filter(User.id > 2).update({'cluster_id' : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name='sb').first() # ret = session.query(User).filter_by(name='sb').all() # print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all() # print ret # ret = session.query(User.name.label('name_label')).all() # print ret,type(ret) # ret = session.query(User).order_by(User.id).all() # print ret # ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()
另一种写法
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session,sessionmaker db_connect_string = 'mysql://root:123456@localhost:3306/testdb?charset=utf8' ssl_args={'ssl':{'cert':'/home//ssl/client-cert.pem', 'key': '/home//shouse/ssl/client-key.pem', 'ca': '/home//shouse/ssl/ca-cert.pem' }} engine = create_engine(db_connect_string,max_overflow=5) Sesson = scoped_session(sessionmaker(bind=engine,expire_on_commit=False)) def getSesson(): return Sesson() from contextlib import contextmanager @contextmanager def session_scope(): session = Sesson() try: yield session session.commit() except: session.rollback() raise finally: session.close() Base =declarative_base() class Account(Base): __tablename__='account' id =Column(Integer,primary_key=True) user_name = Column(String(50),nullable=False) password = Column(String(200),nullable=False) title = Column(String(50)) salary = Column(Integer) def is_active(self): # 假设所有用户都是活跃用户 return True def get_id(self): # 返回账号id return self.id def is_authenticated(self): # 假设已经通过验证 return True def is_anoymous(self): # 具有账号名和密码的不是匿名账户 return False # 进行数据库操作 from sqlalchemy import or_,orm def InsertAccount(user,password,title,salary): with session_scope() as session: account = orm.Account(user_name =user,password=password,title=title,salary=salary) session.add(account) def GetAccount(id=None,user_name=None): with session_scope() as session: return session.query(orm.Account).filter(or_(orm.Account.id ==id,orm.Account.user_name==user_name)).first() def DeleteAccount(user_name): with session_scope() as session: account = GetAccount(user_name=user_name) if account: session.delete(account) def UpdateAccount(id,user_name,password,title,salary): with session_scope() as session: account = session.query(orm.Account).filter(orm.Account.id ==id).first() if not account: return account.user_name = user_name account.password=password account.title=title account.salary=salary InsertAccount('dawei','123','nb',3000) InsertAccount('da','456','sb',5000) GetAccount(2) DeleteAccount('da') UpdateAccount(1,'ad','789','2b',5000)
转载于:https://www.cnblogs.com/Erick-L/p/6995215.html
最后
以上就是寂寞乌龟为你收集整理的python 操作SQLAlchemy的全部内容,希望文章能够帮你解决python 操作SQLAlchemy所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复