J'ai créé quelque chose qui extrait les messages de Gmail avec des notifications push en utilisant imap4lib.
Vous devez d'abord activer la connexion IMAP côté Gmail. Consultez l'aide de Gmail pour obtenir de la documentation. https://support.google.com/mail/answer/7126229?hl=ja
Connectons-nous quand tu es prêt
import imaplib
def connect_with_server():
imap_server = 'imap.gmail.com'
imap_port = '993'
imap_user = 'Username' #adresse e-mail Gmail(e.g.en quelque [email protected])
imap_pass = 'Password' #mot de passe de connexion gmail
try:
imap = imaplib.IMAP4_SSL(imap_server,imap_port)
imap.login(imap_user,imap_pass)
except imaplib.IMAP4_SSL.error as e:
print(e)
return False
else:
return imap
Vous pouvez maintenant vous connecter en appelant connect_with_server ()
.
b'[ALERT] Please log in via your web browser: https://support.google.com/mail/accounts/answer/78754 (Failure)'
Si vous obtenez l'erreur, veuillez vous référer à ce qui suit.
http://opendata.jp.net/?p=8498
Le flux est brièvement expliqué ci-dessous.
DONE
entraînera la sortie de ʻIDLE` par le serveur.Le moment auquel le serveur envoie un message au client après «IDLE» est lorsqu'une mise à jour se produit dans la boîte aux lettres </ b>. En d'autres termes, lorsqu'un nouveau courrier arrive ou lorsqu'un autre client émet une commande ʻEXPUNGE` ... etc. De plus, le serveur ne répondra pas aux demandes des clients tant que «DONE» ne sera pas envoyé.
En outre, il peut expirer pendant l'écoute, vous devrez donc vous reconnecter toutes les 29 minutes.
Ce qui suit est une citation de la première URL de référence.
Example: (Omission)
C: A002 IDLE
S: + idling
...time passes; new mail arrives...
S: * 4 EXISTS
C: DONE
S: A002 OK IDLE terminated
...another client expunges message 2 now...
C: A003 FETCH 4 ALL
S: * 4 FETCH (...)
S: A003 OK FETCH completed
C: A004 IDLE
S: * 2 EXPUNGE
S: * 3 EXISTS
S: + idling
(Ce qui suit est omis)
Pour le moment
DONE
et fermez ʻIDLE`N'est-ce pas correct?
Vous recevrez réellement la poussée.
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
#some codes...
else:
#some codes...
J'ai préparé une fonction qui décode le corps de l'en-tête MIME.
Décodez uniquement Content-Type: text / plain
et ignorez le reste.
import email
import base64
import quopri
def get_info_from_header(headers):
mail_transfer_encoding = ''
mail_encoding = ''
for headinfo in headers:
if headinfo[0] == 'Content-Type':
charset = headinfo[1].split(';')[1]
if 'charset="' in charset:
mail_encoding = charset.split('charset="')[1]
mail_encoding = mail_encoding[:-1]
elif 'charset=' in charset:
mail_encoding = charset.split('charset=')[1]
else:
continue
elif headinfo[0] == 'Content-Transfer-Encoding':
mail_transfer_encoding = headinfo[1]
else:
continue
return mail_transfer_encoding , mail_encoding
def mail_to_txt(mail):
mail_transfer_encoding = ''
mail_encoding = 'ISO-2022-JP'
mail_body = ''
mail_data = email.message_from_string(mail)
if mail_data.is_multipart():
for payload in mail_data.get_payload():
if payload.get_content_type() == 'text/plain':
headers = payload._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
mail_body = payload.get_payload()
else:
if mail_data.get_content_type() == 'text/plain':
mail_body = mail_data.get_payload()
headers = mail_data._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
if mail_transfer_encoding == 'base64':
mail_body = base64.urlsafe_b64decode(mail_body.encode('ASCII'))
elif mail_transfer_encoding == 'quoted-printable':
mail_body = quopri.decodestring(mail_body.encode('ASCII'))
else:
mail_body = mail_body.encode('utf-8')
mail_body = mail_body.decode(mail_encoding)
return mail_body
Après cela, si vous lancez les données converties par utf-8 après les avoir récupérées avec Fetch vers mail_to_txt ()
, vous devriez probablement cracher un corps lisible.
def main_routine():
text_list=[]
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
print(imap_line)
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
typ,data = imap.search(None,'UNSEEN')
if data[0] != '':
mail_ids = data[0].split()
for mail_id in mail_ids:
typ,data = imap.fetch(mail_id,'(RFC822)')
mail = data[0][1]
text_list.append(mail_to_txt(mail.decode('UTF-8')))
imap.store(mail_id,'+FLAGS','\\Seen')
else:
print('No unread e-mails')
print('Terminating FETCH')
else:
print('Jumped out')
imap.close()
imap.logout()
print('Logged out')
return text_list
if __name__ == '__main__':
while True:
mail_list = mail_routine()
for mail in mail_list:
print(mail)
Le résultat de l'exécution est le suivant.
Waiting for a message...
+ idling
* 12 EXISTS
You got a message.
Terminating IDLE mode
Terminating FETCH
Logged out
Test de test
Hello world!!
Tesuto Tesuto
Recommended Posts