Ich habe etwas gemacht, das Nachrichten aus Google Mail mit Push-Benachrichtigungen mit imap4lib abruft.
Zuerst müssen Sie die IMAP-Verbindung auf der Google Mail-Seite aktivieren. Dokumentation finden Sie in der Google Mail-Hilfe. https://support.google.com/mail/answer/7126229?hl=ja
Lassen Sie uns verbinden, wenn Sie bereit sind
import imaplib
def connect_with_server():
imap_server = 'imap.gmail.com'
imap_port = '993'
imap_user = 'Username' #Google Mail-E-Mail-Adresse([email protected])
imap_pass = 'Password' #Google Mail-Anmeldekennwort
try:
imap = imaplib.IMAP4_SSL(imap_server,imap_port)
imap.login(imap_user,imap_pass)
except imaplib.IMAP4_SSL.error as e:
print(e)
return False
else:
return imap
Sie können jetzt eine Verbindung herstellen, indem Sie "connect_with_server ()" aufrufen.
b'[ALERT] Please log in via your web browser: https://support.google.com/mail/accounts/answer/78754 (Failure)'
Wenn Sie den Fehler erhalten, lesen Sie bitte Folgendes.
http://opendata.jp.net/?p=8498
Der Ablauf wird im Folgenden kurz erläutert.
IDLE
an den Server senden.Der Zeitpunkt, zu dem der Server nach "IDLE" eine Nachricht an den Client sendet, ist , wenn eine Aktualisierung in der Mailbox </ b> erfolgt. Mit anderen Worten, wenn eine neue Mail eintrifft oder wenn ein anderer Client einen Befehl "EXPUNGE" ausgibt ... usw. Außerdem antwortet der Server nicht auf Anfragen von Clients, bis "FERTIG" gesendet wird.
Außerdem kann es während des Abhörens zu einer Zeitüberschreitung kommen, sodass Sie alle 29 Minuten eine neue Verbindung herstellen müssen.
Das Folgende ist ein Zitat aus der ersten Referenz-URL.
Example: (Unterlassung)
C: A002 IDLE
S: + idling
...time passes; new mail arrives...
S: * 4 EXISTS
C: DONE
S: A002 OK IDLE terminated
...another client expunges message 2 now...
C: A003 FETCH 4 ALL
S: * 4 FETCH (...)
S: A003 OK FETCH completed
C: A004 IDLE
S: * 2 EXPUNGE
S: * 3 EXISTS
S: + idling
(Folgendes wird weggelassen)
Vorerst
Ist es nicht okay?
Sie erhalten tatsächlich den Push.
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
#some codes...
else:
#some codes...
Ich habe eine Funktion vorbereitet, die den Body aus dem MIME-Header dekodiert.
Dekodiere nur Content-Type: text / plain
und ignoriere den Rest.
import email
import base64
import quopri
def get_info_from_header(headers):
mail_transfer_encoding = ''
mail_encoding = ''
for headinfo in headers:
if headinfo[0] == 'Content-Type':
charset = headinfo[1].split(';')[1]
if 'charset="' in charset:
mail_encoding = charset.split('charset="')[1]
mail_encoding = mail_encoding[:-1]
elif 'charset=' in charset:
mail_encoding = charset.split('charset=')[1]
else:
continue
elif headinfo[0] == 'Content-Transfer-Encoding':
mail_transfer_encoding = headinfo[1]
else:
continue
return mail_transfer_encoding , mail_encoding
def mail_to_txt(mail):
mail_transfer_encoding = ''
mail_encoding = 'ISO-2022-JP'
mail_body = ''
mail_data = email.message_from_string(mail)
if mail_data.is_multipart():
for payload in mail_data.get_payload():
if payload.get_content_type() == 'text/plain':
headers = payload._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
mail_body = payload.get_payload()
else:
if mail_data.get_content_type() == 'text/plain':
mail_body = mail_data.get_payload()
headers = mail_data._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
if mail_transfer_encoding == 'base64':
mail_body = base64.urlsafe_b64decode(mail_body.encode('ASCII'))
elif mail_transfer_encoding == 'quoted-printable':
mail_body = quopri.decodestring(mail_body.encode('ASCII'))
else:
mail_body = mail_body.encode('utf-8')
mail_body = mail_body.decode(mail_encoding)
return mail_body
Wenn Sie danach die von utf-8 konvertierten Daten werfen, nachdem Sie sie mit Fetch nach "mail_to_txt ()" gebracht haben, sollten Sie wahrscheinlich einen lesbaren Text ausspucken.
def main_routine():
text_list=[]
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
print(imap_line)
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
typ,data = imap.search(None,'UNSEEN')
if data[0] != '':
mail_ids = data[0].split()
for mail_id in mail_ids:
typ,data = imap.fetch(mail_id,'(RFC822)')
mail = data[0][1]
text_list.append(mail_to_txt(mail.decode('UTF-8')))
imap.store(mail_id,'+FLAGS','\\Seen')
else:
print('No unread e-mails')
print('Terminating FETCH')
else:
print('Jumped out')
imap.close()
imap.logout()
print('Logged out')
return text_list
if __name__ == '__main__':
while True:
mail_list = mail_routine()
for mail in mail_list:
print(mail)
Das Ausführungsergebnis ist wie folgt.
Waiting for a message...
+ idling
* 12 EXISTS
You got a message.
Terminating IDLE mode
Terminating FETCH
Logged out
Test Test
Hello world!!
Tesuto Tesuto
Recommended Posts