[PYTHON] Accédez automatiquement au flux dans enebular et appuyez sur la gâchette

Je suis Niamugi, qui est en charge du deuxième jour du Calendrier de l'Avent enebular 2019. Cette fois, je voudrais vous montrer comment accéder automatiquement à http://enebular.com/app et déclencher le flux.

manière

Grattage

Déclencheur

Le flux Node-RED créé par enebular est enfin déployé sur un appareil ou un service. Même s'il y a un flux sur enebular, c'est un gaspillage de ne pas l'utiliser ... Donc, si le flux peut être ouvert et exécuté automatiquement, peut-il être utilisé? J'ai pensé, j'ai essayé.

Essaie

Ecrire un flux Node-RED pour le test

Cliquer sur le bouton du nœud d'injection "démarrer" est un flux qui exécute simplement GCP Cloud Functions avec une requête http. enebular_node_red.PNG

Grattage http://enebular.com/app

Nous étudierons étape par étape à partir de l'endroit où la page Web est ouverte. Le programme est écrit en python, ce qui est facile à gratter.

s'identifier

Ouvrez http://enebular.com/app et attendez de pouvoir saisir votre adresse e-mail.
WebDriverWait(driver, 5).until(ec.element_to_be_clickable((By.NAME, "email")))

Lorsqu'il s'ouvre, entrez votre adresse e-mail et votre mot de passe et appuyez sur Entrée.

id = driver.find_element_by_name("email")
id.send_keys(un)
password = driver.find_element_by_name("password")
password.send_keys(pw)
password.send_keys(Keys.ENTER)

Sélectionnez un actif

Attendez que l'élément que vous souhaitez sélectionner apparaisse. Cette fois, c'est un "test de grattage".
assetNm = "testscraping"
assetPath = '//span[@data-testid="' + assetNm +'"]'
WebDriverWait(driver, 5).until(ec.element_to_be_clickable((By.XPATH, assetPath)))

Cliquez sur l'actif lorsqu'il apparaît.

driver.find_element_by_xpath(assetPath).click()

Appuyez sur le bouton "modifier" dans l'aperçu

Attendez que l'aperçu des actifs apparaisse. Il est estimé que l'heure à laquelle le bouton Modifier sur le côté droit peut être cliqué est affichée.
editBtn = '//button[@data-testid="btn-edit-flow"]'
WebDriverWait(driver, 5).until(ec.element_to_be_clickable((By.XPATH, editBtn)))

Cliquez sur le bouton Modifier lorsqu'il apparaît.

driver.find_element_by_xpath(editBtn).click()

Changer d'onglet

Cliquez sur le bouton Modifier pour afficher le flux dans un onglet séparé. Par conséquent, il est nécessaire de changer d'onglet.

handle_array = driver.window_handles
driver.switch_to.window(handle_array[1])

Attendez que le flux apparaisse

** C'est le plus grand défi. ** ** Tout d'abord, "Loading ..." s'affiche.

Ensuite, le cadre de modification du flux s'affiche. ** Pour le moment, l'onglet de flux n'est pas encore affiché. ** ** Après un certain temps, les onglets de flux (factice et entrée cette fois) seront affichés.

Étant donné que l'affichage des images et la synchronisation d'affichage des onglets sont différents, une certaine ingéniosité est requise. Le problème a été résolu en répétant le chargement des cadres jusqu'à ce que le flux soit affiché correctement (= les onglets étaient affichés). (Il n'est pas bon d'attendre avec "while True" ... vous devriez avoir une limite de temps ...)

flowNm = 'input'
frame = '//div[@id="iframeBlock"]/iframe'
flowPath = '//a[@title="' + flowNm + '"]'

# Loading...Attends la fin
WebDriverWait(driver, 30).until(ec.frame_to_be_available_and_switch_to_it((By.XPATH, frame)))

#Attendez que le débit soit entièrement affiché
while True:
    #Annuler le cadre
    driver.switch_to.default_content()
    #Changer de cadre
    iframe = driver.find_element_by_xpath(frame)
    driver.switch_to_frame(iframe)        

    try:
        #Exception si le flux ne s'affiche pas correctement
        WebDriverWait(driver, 1).until(ec.visibility_of_element_located((By.XPATH, flowPath)))
        break
    except:
        pass

Cliquez sur l'onglet lorsqu'il apparaît.

driver.find_element_by_xpath(flowPath).click()

Appuyez sur la gâchette

Cette fois, cliquez sur le bouton du nœud d'injection "start" pour exécuter le flux. Il est difficile d'accéder au nœud.
triggerNodeNm = 'start'
try:
    #Déplacer vers le canevas
    workspace = driver.find_element_by_id('workspace')
    chart = workspace.find_element_by_id('chart')
    canvas = chart.find_element_by_class_name('innerCanvas')
    
    # class = "node nodegroup"Trouver
    nodes = canvas.find_elements_by_css_selector('.node.nodegroup')
    #Vérifiez si le nom du nœud correspond à celui spécifié
    #S'ils correspondent, appuyez sur la gâchette
    for node in nodes:
        nodeNm = node.find_element_by_tag_name('text').text
        if nodeNm == triggerNodeNm:
            node.find_element_by_class_name('node_button_button').click()
            break
        
except Exception as e:
    driver.quit()

Après avoir appuyé sur la gâchette, l'écran se ferme et le processus se termine.

driver.quit()

Résultat d'essayer

Lorsque vous exécutez le programme ... La fonction CloudFunctions a été exécutée!

Résumé

J'ai pu profiter du flux enebulaire en grattant. Cette méthode peut être suffisante si le flux ne dépend pas du système ou de l'interface sur lequel il est déployé. (Il y a un problème qui ne fonctionne pas si la balise ou le nom de classe de la page Web change ...)

J'espère que cela vous donnera quelques indices. À bientôt.

Recommended Posts

Accédez automatiquement au flux dans enebular et appuyez sur la gâchette
Accéder à l'API Twitter avec Python
Vérifiez que la page d'accueil de l'école est automatiquement mise à jour
Trouvez-le dans la file d'attente et modifiez-le
Accéder aux fichiers dans le même répertoire que le fichier exécutable
12. Enregistrez la première colonne dans col1.txt et la deuxième colonne dans col2.txt
Créez automatiquement des rapports Word et Excel avec Python
À propos de la différence entre "==" et "is" en python
Lorsque l'axe et l'étiquette se chevauchent dans matplotlib
Macports easy_install résout et exécute automatiquement la version
Accédez aux variables définies dans le script depuis REPL