En regardant un certain code OSS, il y avait quelque chose qui semblait bon, alors veuillez le présenter.
Un certain OSS est Apache Airflow. Parmi eux, airflow / utils / session.py était un bon sentiment.
Tout d'abord, à partir de session.py.
import contextlib
from functools import wraps
from airflow import settings
# contextlib.Si vous spécifiez contextmanager, il se fermera automatiquement en utilisant avec.
@contextlib.contextmanager
def create_session():
"""
Contextmanager that will create and teardown a session.
"""
session = settings.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
#Je veux utiliser Session Il complète bien Session lorsqu'il est utilisé en fonction
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
arg_session = 'session'
func_params = func.__code__.co_varnames
session_in_args = arg_session in func_params and \
func_params.index(arg_session) < len(args)
session_in_kwargs = arg_session in kwargs
#S'il y a une session dans l'argument de fonction, utilisez-la, sinon créez-la
if session_in_kwargs or session_in_args:
return func(*args, **kwargs)
else:
with create_session() as session:
kwargs[arg_session] = session
return func(*args, **kwargs)
return wrapper
Donc, le code à utiliser ensuite. Ligne 940 de airflow / models / baseoperator.py /airflow/blob/master/airflow/models/baseoperator.py#L940)
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: Session = None) -> List[TaskInstance]:
"""
Get a set of task instance related to this task for a specific date
range.
"""
end_date = end_date or timezone.utcnow()
return session.query(TaskInstance)\
.filter(TaskInstance.dag_id == self.dag_id)\
.filter(TaskInstance.task_id == self.task_id)\
.filter(TaskInstance.execution_date >= start_date)\
.filter(TaskInstance.execution_date <= end_date)\
.order_by(TaskInstance.execution_date)\
.all()
Si vous le spécifiez en tant que décorateur avec une fonction qui utilise session comme
Si session n'est pas définie dans l'appelant, elle sera nouvellement créée (et sera fermée à la fin).
Si vous avez défini session, vous pouvez obtenir une agréable sensation de l'utiliser telle quelle.
Je pense que cette méthode peut être appliquée de différentes manières autres que la connexion DB.
Recommended Posts