Ich wollte den Prozess der mehrfachen Ausführung von "pd.read_sql" mit "Multiprocessing" parallelisieren.
Versions
Name | Version |
---|---|
python | 3.7.3 |
pandas | 0.24.2 |
numpy | 1.16.2 |
psycopg2-binary | 2.8.4 |
PostgreSQL | 11.5 |
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Einige Verarbeitung einschließlich
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
Als ich den unteren Teil des Codes im obigen Erfolgsbeispiel mit "Multiprocessing" und "Modified" -Funktion () "umschrieb, damit die Variablen entsprechend gegessen werden können, wurde ein Fehler ausgegeben.
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Einige Verarbeitung einschließlich
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
multiprocessing.pool.RemoteTraceback:
#Unterlassung
psycopg2.OperationalError: lost synchronization with server: got message type
#Unterlassung
psycopg2.InterfaceError: connection already closed
#Unterlassung
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
#Unterlassung
Im Code des obigen Fehlerbeispiels könnte dies vermieden werden, indem die Verbindungsinformationen in function ()
geschrieben werden.
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Einige Verarbeitung einschließlich
return
Der folgende Fehler tritt abhängig von der Anzahl der Parallelen und dem Verarbeitungsinhalt von function ()
auf. Er kann jedoch vermieden werden, indem export OMP_NUM_THREADS = 1
gesetzt wird.
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.
Recommended Posts