Created by CyanHall.com on 11/22/2020 , Last updated: 03/30/2021.
πŸ‘‰Β Β github shields Star me if it’s helpful.

1. Driver

    # psycopg2
postgresql+psycopg2://user:[email protected]:port/dbname[?key=value&key=value...]

# asyncpg
postgresql+asyncpg://user:[email protected]:port/dbname[?key=value&key=value...]

# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
  

2. SampleModel

    import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class SampleModel(Base):
    __tablename__ = 'samples'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)
  

3. Query one

    SampleModel.query.filter(SampleModel.id == id).first()
  

4. Query all

    SampleModel.query.filter(SampleModel.id == id).all()
  

5. Count Model

    SampleModel.query.filter(SampleModel.id == id).count()
  

6. Query one and delete

    SampleModel.query.filter(SampleModel.id == id).delete()
db.session.delete(obj)
  

7. Query one and update

    SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session=False)
  

8. order_by

    SampleModel.query.order_by(SampleModel.total.desc()).all()
  

9. id in list

    SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
  

10. Row lock

    SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
  

11. label

    SampleModel.query(SampleModel.name.label('alias_name')).first()
  

12. or_

    from sqlalchemy import or_
SampleModel.query.filter(
  or_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

13. and_

    from sqlalchemy import and_
SampleModel.query.filter(
  and_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

14. func.avg and func.count

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.count(SampleModel.order_id).label('order_count')
).outerjoin(
  Order, Order.order_id == SampleModel.order_id
).group_by(
  SampleModel.id
).all()
  

15. func.group_concat

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.group_concat(SampleRole.name.distinct())
).outerjoin(
  SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
  SampleModel.id
).all()
  

16. func.concat

    from sqlalchemy import func
SampleModel.query(
  func.concat(SampleModel.first_name, ' ', SampleModel.last_name) == 'First Last'
).first()
  

17. keyword filter

    filter_keywords = "some name"
q = db.session.query(*SampleModel1.__table__.columns, SampleModel2.name2)     .outerjoin(SampleModel2, SampleModel2.some_id == SampleModel1.some_id) if filter_keywords.strip():
    q = q.filter(
        or_(
            SampleModel1.name1.like(f"%{filter_keywords}%"),
            SampleModel2.name2.like(f"%{filter_keywords}%")
        )
    )
  

18. Date filter

    q = q.filter(func.date(ShopDelivery.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))
  

19. dynamic filter

    filter_exps = []
for word in words.split(','):
    filter_exps.append(
        User.name.like("%{}%".format(word))
    )
q = q.filter(or_(*filter_exps))
  

20. alias

    from sqlalchemy.orm import aliased
user1 = aliased(User)
user2 = aliased(User)
q = db.session.query(
    *Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
    user1, user1.user_id == ShopOrder.user1_id
).outerjoin(
    user2, user2.user_id == ShopOrder.user2_id
).first()
  

21. flush

    model = SampleModel()
for k, v in info.items():
    setattr(model, k, v)
db.session.add(model)
db.session.flush()
  

1. Driver

    # psycopg2
postgresql+psycopg2://user:[email protected]:port/dbname[?key=value&key=value...]

# asyncpg
postgresql+asyncpg://user:[email protected]:port/dbname[?key=value&key=value...]

# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
  

3. Query one

    SampleModel.query.filter(SampleModel.id == id).first()
  

5. Count Model

    SampleModel.query.filter(SampleModel.id == id).count()
  

7. Query one and update

    SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session=False)
  

9. id in list

    SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
  

11. label

    SampleModel.query(SampleModel.name.label('alias_name')).first()
  

13. and_

    from sqlalchemy import and_
SampleModel.query.filter(
  and_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

15. func.group_concat

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.group_concat(SampleRole.name.distinct())
).outerjoin(
  SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
  SampleModel.id
).all()
  

17. keyword filter

    filter_keywords = "some name"
q = db.session.query(*SampleModel1.__table__.columns, SampleModel2.name2)     .outerjoin(SampleModel2, SampleModel2.some_id == SampleModel1.some_id) if filter_keywords.strip():
    q = q.filter(
        or_(
            SampleModel1.name1.like(f"%{filter_keywords}%"),
            SampleModel2.name2.like(f"%{filter_keywords}%")
        )
    )
  

19. dynamic filter

    filter_exps = []
for word in words.split(','):
    filter_exps.append(
        User.name.like("%{}%".format(word))
    )
q = q.filter(or_(*filter_exps))
  

21. flush

    model = SampleModel()
for k, v in info.items():
    setattr(model, k, v)
db.session.add(model)
db.session.flush()
  

2. SampleModel

    import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class SampleModel(Base):
    __tablename__ = 'samples'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)
  

4. Query all

    SampleModel.query.filter(SampleModel.id == id).all()
  

6. Query one and delete

    SampleModel.query.filter(SampleModel.id == id).delete()
db.session.delete(obj)
  

8. order_by

    SampleModel.query.order_by(SampleModel.total.desc()).all()
  

10. Row lock

    SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
  

12. or_

    from sqlalchemy import or_
SampleModel.query.filter(
  or_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

14. func.avg and func.count

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.count(SampleModel.order_id).label('order_count')
).outerjoin(
  Order, Order.order_id == SampleModel.order_id
).group_by(
  SampleModel.id
).all()
  

16. func.concat

    from sqlalchemy import func
SampleModel.query(
  func.concat(SampleModel.first_name, ' ', SampleModel.last_name) == 'First Last'
).first()
  

18. Date filter

    q = q.filter(func.date(ShopDelivery.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))
  

20. alias

    from sqlalchemy.orm import aliased
user1 = aliased(User)
user2 = aliased(User)
q = db.session.query(
    *Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
    user1, user1.user_id == ShopOrder.user1_id
).outerjoin(
    user2, user2.user_id == ShopOrder.user2_id
).first()
  


Maitained byΒ Cyanhall.com, Copy Rights @ CC BY-NC-SA 4.0