J'ai essayé de refactoriser le code du modèle affiché dans "Obtenir des images de l'API Flickr avec Python" (Partie 1) est une continuation. Le processus d'acquisition d'images à l'aide de l'API Flickr est terminé. Cependant, il est difficile d'acquérir de nombreuses images en utilisant de nombreux mots-clés car elles sont traitées séquentiellement. Ici, je voudrais le modifier en traitement parallèle et vérifier à quel point la vitesse de traitement est bonne.
from flickrapi import FlickrAPI
import requests
import os, time, sys
import configparser
import time
#Chemin du dossier d'image
imgdir = os.path.join(os.getcwd(), "images")
#Utilisez l'API Flickr
def request_flickr(keyword, count=100, license=None):
#Créez un client connecté et effectuez une recherche
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Mot-clé de recherche
per_page = count, #Nombre de données acquises
media = 'photos', #Collectez des photos
sort = 'relevance', #Obtenez des dernières
safe_search = 1, #Évitez les images violentes
extras = 'url_l, license' #Informations supplémentaires à obtenir(URL de téléchargement, licence)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Rédacteur
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Non commercial
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Commercialisation
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Commercialisation
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Commercialisation
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Commercialisation
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Télécharger à partir du lien d'image
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
if __name__ == "__main__":
#Démarrer la mesure du temps de traitement
start = time.time()
#Obtenir la requête
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Enregistrer le dossier
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#Sinon, créez un dossier
if not os.path.isdir(savedir):
os.mkdir(savedir)
photos = request_flickr(keyword, count=500, license="NonCommercial|Commercial")
for photo in filter(lambda p : "url_l" in p.keys(), photos):
url = photo['url_l']
filepath = os.path.join(os.path.join(imgdir, keyword), photo['id'] + '.jpg')
download_img(url, filepath)
time.sleep(1)
print('temps de traitement', (time.time() - start), "Secondes")
--Je souhaite traiter plusieurs recherches par mots-clés en utilisant l'API Flickr en parallèle. --Je souhaite traiter le processus de téléchargement à partir du lien image en parallèle
J'ai pensé que j'utiliserais «concurrent.futures.ThreadPoolExecutor» pour le traitement parallèle.
La description de joblib
est plus simple, je vais donc l'utiliser. Vous pouvez écrire sur une ligne dans la notation d'inclusion de liste comme suit.
Parallel(n_jobs=8)([delayed({callback_func})(param1, param2, ...) for {element} in {list}])
Ici, nous allons essayer de superposer et de paralléliser deux processus principaux: demander plusieurs mots-clés à l'API Flickr et acquérir plusieurs URL d'image à partir d'un mot-clé après l'acquisition de la réponse de l'API.
#Traitement dans la hiérarchie parente
def main_process(keyword, count=100, wait_time=1):
#Récupération et stockage des résultats
photos = request_flickr(keyword, count=count)
#Télécharger l'image
#Clé"url_l"Extrayez uniquement ceux qui contiennent(Appelant du processus de hiérarchie enfant)
Parallel(n_jobs=-1)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photos])
#Traitement dans la hiérarchie enfant
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
...
query = ["Ikebukuro","Otsuka","Negamo","Komagome","Tabata"]
#Demander plusieurs mots-clés à l'API Flickr(Appelant du processus de hiérarchie parent)
Parallel(n_jobs=-1)([delayed(main_process)(keyword, count=500, wait_time=1) for keyword in query])
...
Le paramètre n_jobs
représente le nombre de processus. S'il est 1, vous pouvez spécifier le traitement séquentiel réel, et s'il est égal à -1, vous pouvez spécifier le nombre maximum de processus CPU à exécuter.
Comme mot-clé, j'ai utilisé le nom de la station de la ligne Yamate.
query.txt
Ikebukuro
Otsuka
Negamo
Komagome
Tabata
Nishi Nippori
Nippori
Uguisudani
Ueno
Okachimachi
Akihabara
Kanda
Tokyo
Yurakucho
Shimbashi
Ville de Hamamatsu
Tamachi
Shinagawa
Osaki
Gotanda
Meguro
Ebisu
Shibuya
Harajuku
Yoyogi
Shinjuku
Shin-Okubo
Takada Baba
Mejiro
from flickrapi import FlickrAPI
from urllib.request import urlretrieve
import requests
import os, time, sys
import configparser
import time
from joblib import Parallel, delayed
#Chemin du dossier d'image
imgdir = os.path.join(os.getcwd(), "images")
__JOB_COUNT__ = 1
#Utilisez l'API Flickr
def request_flickr(keyword, count=100, license=None):
#Créez un client connecté et effectuez une recherche
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Mot-clé de recherche
per_page = count, #Nombre de données acquises
media = 'photos', #Collectez des photos
sort = 'relevance', #Obtenez des dernières
safe_search = 1, #Évitez les images violentes
extras = 'license' #Informations supplémentaires à obtenir(URL de téléchargement, licence)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Rédacteur
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Non commercial
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Commercialisation
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Commercialisation
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Commercialisation
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Commercialisation
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Télécharger à partir du lien d'image
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
else :
print("not download:{}".format(url))
#Traitement dans la hiérarchie parente
def main_process(keyword, count=100, wait_time=1):
#Récupération et stockage des résultats
photos = request_flickr(keyword, count=count)
#Télécharger l'image
#Clé"url_l"Extrayez uniquement ceux qui contiennent(Appelant du processus de hiérarchie enfant)
Parallel(n_jobs=__JOB_COUNT__)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photo in photos ])
#Traitement dans la hiérarchie enfant
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
#Démarrer la mesure du temps de traitement
start = time.time()
#Obtenir la requête
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Enregistrer le dossier
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#Sinon, créez un dossier
if not os.path.isdir(savedir):
os.mkdir(savedir)
#Demander plusieurs mots-clés à l'API Flickr(Appelant du processus de hiérarchie parent)
Parallel(n_jobs=__JOB_COUNT__)([delayed(main_process)(keyword, count=10, wait_time=1) for keyword in query])
print('Traitement parallèle', (time.time() - start), "Secondes")
La différence avec la dernière fois est le lien de https: // farm {farm-id} .staticflickr.com / {server-id} / {id} _ {secret} .jpg
pour obtenir l'image plus sûrement. J'utilise. (Voir API Flickr: URL des sources de photos)
Cette fois, le paramètre de licence n'est pas défini car l'objectif est la vitesse de traitement du téléchargement. count est réglé sur «10». Vous obtenez maintenant des images «290». Le temps de veille après le téléchargement à partir de chaque URL d'image est défini sur «0,5» seconde. Donc, j'ai mesuré la vitesse de traitement lorsque le nombre de processus était «1,2,4,8,16,24,32, max (-1)».
Nombre de processus | temps de traitement(sec) |
---|---|
1 | 360.21357011795044 |
2 | 83.60558104515076 |
4 | 27.984444856643677 |
8 | 11.372981071472168 |
16 | 8.048759937286377 |
24 | 11.179131984710693 |
32 | 11.573050022125244 |
max (n_jobs=-1) | 25.939302921295166 |
なまデータ
速度を対数にした時
Le traitement est effectué 40 à 50 fois plus rapidement que le traitement séquentiel. : scream_cat:
Bien que ce soit un paramètre de n_jobs = -1
, il est plus rapide de saisir une valeur fixe 16
même si la valeur maximale est définie. Dans l'environnement d'exécution, ʻimport os os.cpu_count () = 4 `, donc cela dépend probablement du nombre de processus dans le cpu.
En passant, l'API flickr a une limite de 3600 données / heure. Cependant, il semble qu'il puisse être utilisé pour de nombreux traitements en boucle.
Fluent Python Chapitre 17 propose un exemple de téléchargement de drapeaux en parallèle, mais cette API Flickr est plus pratique. C'est également un bon point à utiliser comme bon échantillon de matériau lors de l'utilisation de «Future» ou lorsque vous souhaitez effectuer un traitement parallèle plus optimisé. : curry:
Recommended Posts