Looking at a certain OSS code, there was something that looked good, so let me introduce it.
A certain OSS is Apache Airflow. Among them, airflow / utils / session.py was a good feeling.
First, from session.py.
import contextlib
from functools import wraps
from airflow import settings
# contextlib.If you specify contextmanager, it will automatically close using with.
@contextlib.contextmanager
def create_session():
    """
    Contextmanager that will create and teardown a session.
    """
    session = settings.Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()
#I want to use Session It complements Session nicely when used in a function
def provide_session(func):
    """
    Function decorator that provides a session if it isn't provided.
    If you want to reuse a session or run the function as part of a
    database transaction, you pass it to the function, if not this wrapper
    will create one and close it for you.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        arg_session = 'session'
        func_params = func.__code__.co_varnames
        session_in_args = arg_session in func_params and \
            func_params.index(arg_session) < len(args)
        session_in_kwargs = arg_session in kwargs
        #If there is a session in the argument of function, use it, if not, create it
        if session_in_kwargs or session_in_args:
            return func(*args, **kwargs)
        else:
            with create_session() as session:
                kwargs[arg_session] = session
                return func(*args, **kwargs)
    return wrapper
So, the code on the side to use next. Line 940 in airflow / models / baseoperator.py /airflow/blob/master/airflow/models/baseoperator.py#L940)
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
                        end_date: Optional[datetime] = None,
                        session: Session = None) -> List[TaskInstance]:
    """
    Get a set of task instance related to this task for a specific date
    range.
    """
    end_date = end_date or timezone.utcnow()
    return session.query(TaskInstance)\
        .filter(TaskInstance.dag_id == self.dag_id)\
        .filter(TaskInstance.task_id == self.task_id)\
        .filter(TaskInstance.execution_date >= start_date)\
        .filter(TaskInstance.execution_date <= end_date)\
        .order_by(TaskInstance.execution_date)\
        .all()
If you specify it as a decorator with a function that uses session like
If session is not set in the caller, it will be newly created (and will be closed at the end).
If you have set session, you can achieve a nice feeling of using it as it is.
I think this method can be applied in various ways other than DB connection.
Recommended Posts