[PYTHON] I was addicted to multiprocessing + psycopg2

What I wanted to do

I wanted to parallelize the process of executing pd.read_sql multiple times with multiprocessing.

Versions

Name Version
python 3.7.3
pandas 0.24.2
numpy 1.16.2
psycopg2-binary 2.8.4
PostgreSQL 11.5

Success example (non-parallel ver.)

code

import multiprocessing
import pandas as pd
import numpy as np
import psycopg2

def get_connection():
	connection = psycopg2.connect(
						host='hostname',
						user='username',
						database='databasename',
						password='password')
	return connection

def function():
	for fuga in hoge:

		# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including

	return

if __name__ == '__main__':
	with get_connection() as conn:
		hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())

		function

Failure example

When I rewrote the bottom of the code in the above success example with multiprocessing and modifiedfunction ()to eat variables accordingly, an error was output.

code

def function(i):
	fuga = hoge[i]

	# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including

	return

if __name__ == '__main__':
	with get_connection() as conn:
		hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())

		with multiprocessing.Pool(processes=64) as pool:
			for _ in pool.imap_unordered(function, range(len(hoge))):
				pass

Error excerpt

multiprocessing.pool.RemoteTraceback:

#Omission

psycopg2.OperationalError: lost synchronization with server: got message type

#Omission

psycopg2.InterfaceError: connection already closed

#Omission

pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback

#Omission

Success story

In the code of the above failure example, it could be avoided by writing the connection information in function ().

code

def function(i):
	with get_connection() as conn:
		fuga = hoge[i]

		# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including

	return

Caution

The following error will occur depending on the number of parallels and the processing content of function (), but it can be avoided by setting ʻexport OMP_NUM_THREADS = 1`.

OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.

Recommended Posts

I was addicted to multiprocessing + psycopg2
I was addicted to Flask on dotCloud
What I was addicted to Python autorun
[Introduction to json] No, I was addicted to it. .. .. ♬
I was addicted to scraping with Selenium (+ Python) in 2020
A story that I was addicted to at np.where
I was addicted to trying logging.getLogger in Flask 1.1.x
What I was addicted to when using Python tornado
[IOS] GIF animation with Pythonista3. I was addicted to it.
What I was addicted to when migrating Processing users to Python
[Fixed] I was addicted to alphanumeric judgment of Python strings
A story that I was addicted to calling Lambda from AWS Lambda.
The record I was addicted to when putting MeCab on Heroku
What I was addicted to when introducing ALE to Vim for Python
What I was addicted to with json.dumps in Python base64 encoding
A note I was addicted to when making a beep on Linux
Note that I was addicted to sklearn's missing value interpolation (Imputer)
A note I was addicted to when creating a table with SQLAlchemy
Two things I was addicted to building Django + Apache + Nginx on Windows
I was addicted to running tensorflow on GPU with NVIDIA driver 440 + CUDA 10.2
A story I was addicted to when inserting from Python to a PostgreSQL table
A story I was addicted to trying to install LightFM on Amazon Linux
I was addicted to creating a Python venv environment with VS Code
A story I was addicted to trying to get a video url with tweepy
I was addicted to not being able to use Markdown on pypi's long_description
The file name was bad in Python and I was addicted to import
[Python] I was addicted to not saving internal variables of lambda expressions
I was addicted to trying Cython with PyCharm, so make a note
I was able to recurse in Python: lambda
What I was addicted to when creating a web application in a windows environment
Three things I was addicted to when using Python and MySQL with Docker
Summary of points I was addicted to running Selenium on AWS Lambda (python)
Docker x visualization didn't work and I was addicted to it, so I summarized it!
A story that I was addicted to when I made SFTP communication with python
I set up TensowFlow and was addicted to it, so make a note
I was soberly addicted to calling awscli from a Python 2.7 script registered in crontab
Note that I was addicted to npm script not passing in the verification environment
I started to analyze
What I was addicted to when combining class inheritance and Joint Table Inheritance in SQLAlchemy
I tried to debug.
AttributeError: I was addicted to'module' object has no attribute'MyTestCase'
I tried to paste
I was able to repeat it in Python: lambda
When I tried to install PIL and matplotlib in a virtualenv environment, I was addicted to it.
What I was addicted to when dealing with huge files in a Linux 32bit environment
Memo (March 2020) that I was addicted to when installing Arch Linux on MacBook Air 11'Early 2015
The story I was addicted to when I specified nil as a function argument in Go
I was able to implement web app authentication with flask-login
PyTorch's book was difficult to understand, so I supplemented it
I was a little addicted to installing Python3.3 + mod_wsgi3.4 on Sakura VPS (CentOS), so a retrospective memo
When I tried to scrape using requests in python, I was addicted to SSLError, so a workaround memo
I was addicted to deploying GoogleCloudFunctions ('ascii' codec can't encode character u'\ u281b' in position 58 appears)
I wrote AWS Lambda, and I was a little addicted to the default value of Python arguments
I tried to learn PredNet
I tried to organize SVM.
I talked to Raspberry Pi
Note that I was addicted to accessing the DB with Python's mysql.connector using a web application.
I tried to implement PCANet
Introduction to Nonlinear Optimization (I)
I tried to reintroduce Linux
I tried to introduce Pylint