Ich wollte nicht nur das Problem mit Paiza und AtCoder lösen, sondern auch mit dem Erstellen des Programms beginnen. Ich war auch inspiriert von diesem Artikel (das meiste, was Sie zum Programmieren eines Webdienstes tun müssen, wurde durch Scraping gelernt). Dieses Mal möchte ich die in Rikunabi Direct aufgeführten Unternehmen kratzen und in der DB registrieren.
Rikunabi Direct ist ein Dienst, der mehrere Unternehmen pro Woche anzeigt und vorstellt, die Rikunabi Direct ermittelt hat, um dem vom Arbeitssuchenden registrierten Arbeitssuchenden aus der Branche der Wahl zu entsprechen. Suchen von der Seite der Arbeitssuchenden sind nicht möglich. Derzeit wird eine Liste der börsennotierten Unternehmen bereitgestellt, aber es ist sehr zeitaufwändig oder rücksichtslos, insgesamt mehr als 16.000 Unternehmen vom Ende an zu betrachten. Daher habe ich darüber nachgedacht, es durchsuchbar zu machen, indem ich Unternehmensinformationen abgekratzt und in der Datenbank registriert habe.
Das Scrapen dauert zu lange (2,5 Sekunden pro Fall), daher habe ich zwei PhantomJS gestartet und versucht, durch parallele Verarbeitung zu beschleunigen.
main.py
import utils
import os
import MySQLdb
import time
import selenium
import settings
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from selenium import webdriver
utils.py
import sys
import requests
import csv
import os
import MySQLdb
import settings
import time
import selenium
from selenium import webdriver
main.py
NUMBER_OF_BROWSER = 2
browser = utils.generate_browser()
browser_2 = utils.generate_browser()
browser_list = [browser, browser_2]
browsers = []
for i in range(NUMBER_OF_BROWSERS):
browsers.append([browser_list[i], USER_IDs[i], PASS_WORDs[i]])
utils.py
PHANTOMJS_PATH = '/usr/local/bin/phantomjs'
def generate_browser():
browser = webdriver.PhantomJS(executable_path=PHANTOMJS_PATH)
print('PhantomJS initializing')
browser.implicitly_wait(3)
return browser
Warten implizit_warten (), um auf den Start von PhantomJS zu warten.
main.py
TIME_TO_WAIT = 5
USER_IDs = [USER_ID, USER_ID_2]
PASS_WORDs = [PASS_WORD, PASS_WORD_2]
browser = utils.generate_browser()
browser_2 = utils.generate_browser()
browser_list = [browser, browser_2]
for browser_param in browsers:
utils.login(browser_param[1], browser_param[2], browser_param[0])
utils.set_wait_time(TIME_TO_WAIT, browser_param[0])
utils.check_current_url(browser_param[0])
utils.py
def generate_browser():
browser = webdriver.PhantomJS(executable_path=PHANTOMJS_PATH)
print('PhantomJS initializing')
browser.implicitly_wait(3)
return browser
def set_wait_time(time, browser):
browser.set_page_load_timeout(time)
def login(user, pass_word, browser):
#Rufen Sie die Anmeldeseite auf
url_login = 'https://rikunabi-direct.jp/2020/login/'
browser.get(url_login)
#Zugriffsurteil. Beenden Sie, wenn Sie nicht erreichbar sind
test = requests.get(url_login)
status_code = test.status_code
if status_code == 200:
print('HTTP status code ' + str(status_code) + ':Rufen Sie die Anmeldeseite auf')
else:
print('HTTP status code ' + str(test.status_code) + ':Zugriff auf die Anmeldeseite nicht möglich')
sys.exit()
time.sleep(5)
#Geben Sie Benutzer und Passwort ein
#user
element = browser.find_element_by_name('accountId')
element.clear()
element.send_keys(user)
print('Ich habe Benutzer eingegeben')
#Passwort
element = browser.find_element_by_name('password')
element.clear()
element.send_keys(pass_word)
print('Ich habe das Passwort eingegeben')
#Senden
submit = browser.find_element_by_xpath("//img[@alt='Einloggen']")
submit.click()
print('Ich habe den Login-Button gedrückt')
Verwenden Sie browse.get (url), um zur URL-Seite zu gelangen, und browser.find_element_by_name (), um den Wert des Namensattributs anzugeben und das gewünschte Element zu finden. Die Elemente, die Sie suchen möchten, sind wie folgt
<input type="text" name="accountId" autocomplete="off" value="" ...
Holen Sie sich das Element mit browser.find_element_by_name ('accountID'). Löschen Sie das Textfeld des erfassten Elements mit clear () und geben Sie die userID mit send_keys () ein. Geben Sie das Passwort auf die gleiche Weise ein. Die Senden-Schaltfläche wurde durch Angabe mit xpath erhalten. xpath stammt aus der Google Chrome-Validierung. Klicken Sie mit der rechten Maustaste auf> Kopieren> XPath kopieren, um es zu kopieren (zuerst kannte ich diesen bequemen Weg nicht und gab ihn mit einem absoluten Pfad wie / html / body / ... an. Ich hatte viel Ärger.) Für find_element_by_ habe ich auf [hier] verwiesen (https://kurozumi.github.io/selenium-python/locating-elements.html).
main.py
#Wenn csv nicht vorhanden ist, speichern Sie alle URLs in einem Array und exportieren Sie sie als csv
if os.path.exists(URL_PATH) == False:
utils.move_to_company_list(browsers[0][0])
url_arr = utils.get_url(NUMBER_OF_COMPANY, browsers[0][0])
utils.export_csv(url_arr, URL_PATH)
utils.browser_close(browsers[0][0])
else:
#Wenn CSV vorhanden ist, lesen Sie CSV und URL_In arr
url_arr = utils.import_csv(URL_PATH)
utils.py
def move_to_company_list(browser):
#Gehen Sie zur Seite aller börsennotierten Unternehmen
element = browser.find_element_by_link_text('Alle börsennotierten Unternehmen')
element.click()
#Es wird in einer anderen Registerkarte geöffnet. Wechseln Sie also zur zweiten Registerkarte
browser.switch_to_window(browser.window_handles[1])
#Holen Sie sich die URLs aller gelisteten Unternehmen
def get_url(number_of_company, browser):
url_arr = []
for i in range(2, number_of_company):
url_xpath = '/html/body/div/div/table/tbody/tr[{0}]/td/ul/li/a'.format(i)
element = browser.find_element_by_xpath(url_xpath)
url = element.get_attribute('href')
url_arr.append(url)
print(str(i))
print(url)
return url_arr
#Exportieren Sie das Array in CSV
def export_csv(arr, csv_path):
with open(csv_path, 'w') as f:
writer = csv.writer(f, lineterminator='\n')
writer.writerow(arr)
#Schließen Sie die aktuelle Registerkarte und kehren Sie zur ersten Registerkarte zurück
def browser_close(browser):
browser.close()
browser.switch_to_window(browser.window_handles[0])
def import_csv(csv_path):
if os.path.exists(csv_path) == True:
with open(csv_path, 'r') as f:
data = list(csv.reader(f))#Zweidimensionales Array.Das 0. Element ist ein Array von URLs.
return data[0]
else:
print('csv existiert nicht')
sys.exit()
Es braucht jedes Mal Zeit, um die URL abzurufen. Wenn Sie die URL erhalten haben, schreiben Sie sie an csv und speichern Sie sie.
main.py
url_arrs = list(np.array_split(url_arr, NUMBER_OF_BROWSERS))
for i in range(NUMBER_OF_BROWSERS):
print('length of array{0} : '.format(i) + str(len(url_arrs[i])))
main.py
connector = MySQLdb.connect(
unix_socket = DB_UNIX_SOCKET,
host=DB_HOST, user=DB_USER, passwd=DB_PASS_WORD, db=DB_NAME
)
corsor = connector.cursor()
main.py
#Scraping-Verarbeitung in jedem Browser durchführen (Parallelverarbeitung))
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(NUMBER_OF_BROWSERS):
executor.submit(utils.scraping_process, browsers[i][0], url_arrs[i], corsor, connector)
utils.py
def open_new_page(url, browser):
try:
browser.execute_script('window.open()')
browser.switch_to_window(browser.window_handles[1])
browser.get(url)
except selenium.common.exceptions.TimeoutException:
browser_close(browser)
print('connection timeout')
print('retrying ...')
open_new_page(url, browser)
def content_scraping(corsor, connector, browser):
#Finde ein Kratzziel
name_element = browser.find_element_by_class_name('companyDetail-companyName')
position_element = browser.find_element_by_xpath('//div[@class="companyDetail-sectionBody"]/p[1]')
job_description_element = browser.find_element_by_xpath('//div[@class="companyDetail-sectionBody"]/p[2]')
company_name = name_element.text
position = position_element.text
job_description = job_description_element.text
url = browser.current_url
casual_flag = is_exist_casual(browser)
#----------DB Registrierungsprozess unten----------#
#INSERT
corsor.execute('INSERT INTO company_data_2 SET name="{0}", url="{1}", position="{2}", description="{3}", is_casual="{4}"'.format(company_name, url, position, job_description, casual_flag))
connector.commit()
def scraping_process(browser, url_arr, corsor, connector):
count = 0
for url in url_arr:
open_new_page(url, browser)
print('{0} scraping start'.format(count))
check_current_url(browser)
try:
content_scraping(corsor, connector, browser)
except selenium.common.exceptions.NoSuchElementException:
print('Unternehmen, die derzeit nicht gelistet sind')
except MySQLdb._exceptions.ProgrammingError:
print('SQL programming Error')
browser_close(browser)
print('{0} scraping process end.'.format(count))
count += 1
Der im abgerufenen Element beschriebene Text kann in element.text verwendet werden. Wenn für oprn_new_page () eine erwartete Ausnahme (in diesem Fall TimeoutExecption) auftritt, warten Sie eine Weile und implementieren Sie einen rekursiven Wiederholungsprozess, um sich erneut aufzurufen und eine Verbindung herzustellen. Ich habe rekursive Funktionen bei der Lösung von Problemen mit AtCoder und anderen kennengelernt. Ich hatte keine Ahnung, wo ich es in der eigentlichen Verarbeitung verwenden sollte, aber dieses Mal konnte ich selbst ein Verwendungsbeispiel erstellen, indem ich selbst ein Programm erstellte.
Es gibt fünf Arten von Informationen, die Sie löschen möchten: Firmenname, URL, Jobtyp, Jobbeschreibung und ob Sie in Zivil arbeiten können. Ruft das Element anhand des Werts der Attribute Xpath und name ab.
Einige der börsennotierten Unternehmen haben die Veröffentlichung eingestellt. In diesem Fall versucht find_element_by, das nicht vorhandene Element abzurufen, und zu diesem Zeitpunkt tritt eine NoSuchElementException auf. In diesem Fall fangen Sie dies. ProgrammingError wird von MySQLdb zurückgegeben, wenn job_description usw. einfache oder doppelte Anführungszeichen enthält. Jemand sagt mir, wie man so etwas wie die PDO-Vorbereitungsanweisung für PHP macht ~~~~~~~~~~
Scraping_process (), das den Prozess tatsächlich ausführt, ist der Ablauf des Öffnens der Zielseite> Angeben des Elements und Abrufen> DB-Registrierung. Der zeitaufwändigste Teil dieses Prozesses liegt zwischen dem Öffnen der ersten Seite und dem Erfassen des nächsten Elements. Dies liegt daran, dass das Anzeigen der Seite nach dem Öffnen lange dauert. Um die Verlangsamung aufgrund dieses Teils zu verbessern, wird die parallele Verarbeitung mit ThreadPoolExecutor von concurrent.futures durchgeführt. Ich denke, dass sich Browser2 in einem Zustand befindet, in dem Browser1 darauf wartet, dass die Seite angezeigt wird. Dies macht es viel schneller als die Verarbeitung mit einem einzigen Browser.
Wir konnten fast 16.000 Unternehmen erfolgreich abkratzen. Ich habe viel gelernt, indem ich selbst eine rekursive Funktion erstellt, die Struktur von HTML zur Angabe von xpath untersucht und versucht habe, den letzten Prozess zu parallelisieren, indem ich ihn in eine Funktion gruppiert habe. Es war.
* 1 Selenium Python Bindings 4. Find Elements [Python] Verwendung von Selen Zusammenfassung der Auswahl von Elementen mit Selen Selenium API (Reverse Pull) Überprüfen Sie die Existenz der Datei mit Python XPath in Chrome übernehmen und überprüfen Parallele Taskausführung mit concurrent.futures in Python Ich habe die Parallelverarbeitung und Parallelverarbeitung von Python gründlich untersucht Ich habe MySQL von Python 3 auf einem Mac aus betrieben
Recommended Posts