Bei einem bestimmten OSS-Code sah etwas gut aus. Stellen Sie ihn daher bitte vor.
Ein bestimmtes OSS ist Apache Airflow. Unter ihnen war airflow / utils / session.py ein gutes Gefühl.
Zuerst von session.py
.
import contextlib
from functools import wraps
from airflow import settings
# contextlib.Wenn Sie den Kontextmanager angeben, wird er automatisch mit mit geschlossen.
@contextlib.contextmanager
def create_session():
"""
Contextmanager that will create and teardown a session.
"""
session = settings.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
#Ich möchte Sitzung verwenden. Es ergänzt Sitzung gut, wenn es in der Funktion verwendet wird
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
arg_session = 'session'
func_params = func.__code__.co_varnames
session_in_args = arg_session in func_params and \
func_params.index(arg_session) < len(args)
session_in_kwargs = arg_session in kwargs
#Wenn das Argument der Funktion eine Sitzung enthält, verwenden Sie sie. Wenn nicht, erstellen Sie sie
if session_in_kwargs or session_in_args:
return func(*args, **kwargs)
else:
with create_session() as session:
kwargs[arg_session] = session
return func(*args, **kwargs)
return wrapper
Also der Code, der als nächstes verwendet werden soll. Airflow / models / baseoperator.py Zeile 940 /airflow/blob/master/airflow/models/baseoperator.py#L940)
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: Session = None) -> List[TaskInstance]:
"""
Get a set of task instance related to this task for a specific date
range.
"""
end_date = end_date or timezone.utcnow()
return session.query(TaskInstance)\
.filter(TaskInstance.dag_id == self.dag_id)\
.filter(TaskInstance.task_id == self.task_id)\
.filter(TaskInstance.execution_date >= start_date)\
.filter(TaskInstance.execution_date <= end_date)\
.order_by(TaskInstance.execution_date)\
.all()
Wenn Sie es als Dekorateur mit einer Funktion angeben, die "session" like verwendet Wenn im Anrufer keine "Sitzung" festgelegt ist, wird sie neu erstellt (und am Ende geschlossen). Wenn Sie "Sitzung" eingestellt haben, können Sie ein angenehmes Gefühl erzielen, wenn Sie es so verwenden, wie es ist.
Ich denke, diese Methode kann auf verschiedene andere Arten als die DB-Verbindung angewendet werden.
Recommended Posts