Ich habe versucht, den in "Abrufen von Bildern von der Flickr-API mit Python" (Teil 1) veröffentlichten Vorlagencode zu überarbeiten ist eine Fortsetzung. Der Prozess der Bilderfassung mit der Flickr-API ist abgeschlossen. Es ist jedoch schwierig, viele Bilder mit vielen Schlüsselwörtern zu erfassen, da sie nacheinander verarbeitet werden. Hier möchte ich es auf Parallelverarbeitung umstellen und prüfen, wie gut sich die Verarbeitungsgeschwindigkeit anfühlt.
from flickrapi import FlickrAPI
import requests
import os, time, sys
import configparser
import time
#Pfad des Bildordners
imgdir = os.path.join(os.getcwd(), "images")
#Verwenden Sie die Flickr-API
def request_flickr(keyword, count=100, license=None):
#Erstellen Sie einen verbundenen Client und führen Sie eine Suche durch
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Suchbegriff
per_page = count, #Anzahl der erfassten Daten
media = 'photos', #Sammle Fotos
sort = 'relevance', #Holen Sie sich von der neuesten
safe_search = 1, #Vermeiden Sie gewalttätige Bilder
extras = 'url_l, license' #Zusätzliche Informationen zu erhalten(URL zum Download, Lizenz)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Texter
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Nicht kommerziell
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Vermarktung
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Vermarktung
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Vermarktung
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Vermarktung
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Vom Bildlink herunterladen
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
if __name__ == "__main__":
#Starten Sie die Messung der Verarbeitungszeit
start = time.time()
#Abfrage abrufen
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Ordner speichern
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#Wenn nicht, erstellen Sie einen Ordner
if not os.path.isdir(savedir):
os.mkdir(savedir)
photos = request_flickr(keyword, count=500, license="NonCommercial|Commercial")
for photo in filter(lambda p : "url_l" in p.keys(), photos):
url = photo['url_l']
filepath = os.path.join(os.path.join(imgdir, keyword), photo['id'] + '.jpg')
download_img(url, filepath)
time.sleep(1)
print('Verarbeitungszeit', (time.time() - start), "Sekunden")
Ich dachte, ich würde "concurrent.futures.ThreadPoolExecutor" für die parallele Verarbeitung verwenden.
Die Beschreibung von joblib
ist einfacher, deshalb werde ich sie verwenden. Sie können wie folgt in einer Zeile in Listeneinschlussnotation schreiben.
Parallel(n_jobs=8)([delayed({callback_func})(param1, param2, ...) for {element} in {list}])
Hier werden wir versuchen, zwei Hauptprozesse zu schichten und zu parallelisieren: Anfordern mehrerer Schlüsselwörter an die flickr-API und Abrufen mehrerer Bild-URLs von einem Schlüsselwort nach dem Abrufen der API-Antwort.
#Verarbeitung in der übergeordneten Hierarchie
def main_process(keyword, count=100, wait_time=1):
#Ergebnisse abrufen und speichern
photos = request_flickr(keyword, count=count)
#Bild herunterladen
#Filtern"url_l"Extrahieren Sie nur diejenigen, die enthalten(Aufrufer des untergeordneten Hierarchieprozesses)
Parallel(n_jobs=-1)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photos])
#Verarbeitung in untergeordneter Hierarchie
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
...
query = ["Ikebukuro","Otsuka","Negamo","Komagome","Tabata"]
#Fordern Sie mehrere Schlüsselwörter für die flickr-API an(Aufrufer des übergeordneten Hierarchieprozesses)
Parallel(n_jobs=-1)([delayed(main_process)(keyword, count=500, wait_time=1) for keyword in query])
...
Der Parameter n_jobs
gibt die Anzahl der Prozesse an. Wenn es 1 ist, können Sie die tatsächliche sequentielle Verarbeitung angeben, und wenn es -1 ist, können Sie die maximale Anzahl der auszuführenden CPU-Prozesse angeben.
Als Schlüsselwort habe ich den Stationsnamen der Yamate-Linie verwendet.
query.txt
Ikebukuro
Otsuka
Negamo
Komagome
Tabata
Nishi Nippori
Nippori
Uguisudani
Ueno
Okachimachi
Akihabara
Kanda
Tokio
Yurakucho
Shimbashi
Hamamatsu Stadt
Tamachi
Shinagawa
Osaki
Gotanda
Meguro
Ebisu
Shibuya
Harajuku
Yoyogi
Shinjuku
Shin-Okubo
Takada Baba
Mejiro
from flickrapi import FlickrAPI
from urllib.request import urlretrieve
import requests
import os, time, sys
import configparser
import time
from joblib import Parallel, delayed
#Pfad des Bildordners
imgdir = os.path.join(os.getcwd(), "images")
__JOB_COUNT__ = 1
#Verwenden Sie die Flickr-API
def request_flickr(keyword, count=100, license=None):
#Erstellen Sie einen verbundenen Client und führen Sie eine Suche durch
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Suchbegriff
per_page = count, #Anzahl der erfassten Daten
media = 'photos', #Sammle Fotos
sort = 'relevance', #Holen Sie sich von der neuesten
safe_search = 1, #Vermeiden Sie gewalttätige Bilder
extras = 'license' #Zusätzliche Informationen zu erhalten(URL zum Download, Lizenz)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Texter
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Nicht kommerziell
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Vermarktung
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Vermarktung
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Vermarktung
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Vermarktung
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Vom Bildlink herunterladen
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
else :
print("not download:{}".format(url))
#Verarbeitung in der übergeordneten Hierarchie
def main_process(keyword, count=100, wait_time=1):
#Ergebnisse abrufen und speichern
photos = request_flickr(keyword, count=count)
#Bild herunterladen
#Filtern"url_l"Extrahieren Sie nur diejenigen, die enthalten(Aufrufer des untergeordneten Hierarchieprozesses)
Parallel(n_jobs=__JOB_COUNT__)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photo in photos ])
#Verarbeitung in untergeordneter Hierarchie
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
#Starten Sie die Messung der Verarbeitungszeit
start = time.time()
#Abfrage abrufen
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Ordner speichern
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#Wenn nicht, erstellen Sie einen Ordner
if not os.path.isdir(savedir):
os.mkdir(savedir)
#Fordern Sie mehrere Schlüsselwörter für die flickr-API an(Aufrufer des übergeordneten Hierarchieprozesses)
Parallel(n_jobs=__JOB_COUNT__)([delayed(main_process)(keyword, count=10, wait_time=1) for keyword in query])
print('Parallelverarbeitung', (time.time() - start), "Sekunden")
Der Unterschied zum letzten Mal besteht in der Verknüpfung von "https: // farm {farm-id} .staticflickr.com / {server-id} / {id} _ {secret} .jpg ", um das Bild sicherer zu erhalten. Ich benutze. (Siehe Flickr-API: Fotoquellen-URLs)
Diesmal ist der Lizenzparameter nicht festgelegt, da der Zweck die Verarbeitungsgeschwindigkeit des Downloads ist. count wird auf 10
gesetzt. Sie erhalten jetzt 290 Bilder. Die Ruhezeit nach dem Herunterladen von jeder Bild-URL ist auf 0,5 Sekunden eingestellt.
Also habe ich gemessen, wie schnell die Verarbeitungsgeschwindigkeit sein würde, wenn die Anzahl der Prozesse "1,2,4,8,16,24,32, max (-1)" wäre.
Anzahl der Prozesse | Verarbeitungszeit(sec) |
---|---|
1 | 360.21357011795044 |
2 | 83.60558104515076 |
4 | 27.984444856643677 |
8 | 11.372981071472168 |
16 | 8.048759937286377 |
24 | 11.179131984710693 |
32 | 11.573050022125244 |
max (n_jobs=-1) | 25.939302921295166 |
なまデータ
速度を対数にした時
Die Verarbeitung ist 40- bis 50-mal schneller als die sequentielle Verarbeitung. : schreien_katze: Obwohl es sich um einen Parameter von "n_jobs = -1" handelt, ist es schneller, einen festen Wert "16" einzugeben, obwohl der Maximalwert festgelegt ist. In der Ausführungsumgebung ist "import os os.cpu_count () = 4", was wahrscheinlich von der Anzahl der Prozesse in der CPU abhängt.
Abgesehen davon hat die flickr-API ein Limit von 3600 Daten / Stunde. Es scheint jedoch, dass es für viele Schleifenverarbeitungen verwendet werden kann.
In Fluent Python Kapitel 17 finden Sie ein Beispiel für das parallele Herunterladen von Flags. Diese flickr-API ist jedoch praktischer. Es ist auch ein guter Punkt, um als gutes Probenmaterial zu verwenden, wenn Sie "Future" verwenden oder wenn Sie eine optimierte Parallelverarbeitung durchführen möchten. : Curry:
Recommended Posts