En regardant un certain code OSS, il y avait quelque chose qui semblait bon, alors veuillez le présenter.
Un certain OSS est Apache Airflow. Parmi eux, airflow / utils / session.py était un bon sentiment.
Tout d'abord, à partir de session.py
.
import contextlib
from functools import wraps
from airflow import settings
# contextlib.Si vous spécifiez contextmanager, il se fermera automatiquement en utilisant avec.
@contextlib.contextmanager
def create_session():
"""
Contextmanager that will create and teardown a session.
"""
session = settings.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
#Je veux utiliser Session Il complète bien Session lorsqu'il est utilisé en fonction
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
arg_session = 'session'
func_params = func.__code__.co_varnames
session_in_args = arg_session in func_params and \
func_params.index(arg_session) < len(args)
session_in_kwargs = arg_session in kwargs
#S'il y a une session dans l'argument de fonction, utilisez-la, sinon créez-la
if session_in_kwargs or session_in_args:
return func(*args, **kwargs)
else:
with create_session() as session:
kwargs[arg_session] = session
return func(*args, **kwargs)
return wrapper
Donc, le code à utiliser ensuite. Ligne 940 de airflow / models / baseoperator.py /airflow/blob/master/airflow/models/baseoperator.py#L940)
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: Session = None) -> List[TaskInstance]:
"""
Get a set of task instance related to this task for a specific date
range.
"""
end_date = end_date or timezone.utcnow()
return session.query(TaskInstance)\
.filter(TaskInstance.dag_id == self.dag_id)\
.filter(TaskInstance.task_id == self.task_id)\
.filter(TaskInstance.execution_date >= start_date)\
.filter(TaskInstance.execution_date <= end_date)\
.order_by(TaskInstance.execution_date)\
.all()
Si vous le spécifiez en tant que décorateur avec une fonction qui utilise session
comme
Si session
n'est pas définie dans l'appelant, elle sera nouvellement créée (et sera fermée à la fin).
Si vous avez défini session
, vous pouvez obtenir une agréable sensation de l'utiliser telle quelle.
Je pense que cette méthode peut être appliquée de différentes manières autres que la connexion DB.
Recommended Posts