Ich wollte dynamisch Gzip-Daten in [Flask Streaming Contents] erstellen und zurückgeben.
GzipStream-Klasse (selbst erstellt).
#!/usr/bin/env python
# encoding: utf-8
from gzip import GzipFile
from StringIO import StringIO
class GzipStream(object):
def __init__(self):
self._io = StringIO()
self._gf = GzipFile(fileobj=self._io, mode='wb')
self._pos = 0
def push(self, content):
'''
push a part of content.
'''
assert not self._gf.closed
self._gf.write(content)
self._gf.flush()
def pop(self):
'''
:return: A part of gzipped content, or "None" that means EOF.
'''
current_pos = self._io.len
if self._gf.closed and current_pos == self._pos:
return None
part = self._io.getvalue()[self._pos:current_pos]
self._pos = current_pos
return part
def close(self):
'''
you must call this.
'''
if not self._gf.closed:
self._gf.close()
def __enter__(self):
'''Das musst du nicht'''
return self
def __exit__(self, exc_type, exc_value, traceback):
'''Das musst du nicht'''
self.close()
Versuchen Sie es wie folgt.
>>> from streamgzip import GzipStream
>>> gs = GzipStream()
>>> gs.push('hello ')
>>> gs.pop()
'\x1f\x8b\x08\x00\xfd\xcf\x0cX\x02\xff\xcaH\xcd\xc9\xc9W\x00\x00\x00\x00\xff\xff'
>>> gs.push('world ')
>>> gs.push('!!')
>>> gs.pop()
'*\xcf/\xcaIQ\x00\x00\x00\x00\xff\xffRT\x04\x00\x00\x00\xff\xff'
>>> gs.close()
>>> gs.pop()
'\x03\x00\xb7\xc8\xd4%\x0e\x00\x00\x00'
>>> print gs.pop()
None
Versuchen Sie zu überprüfen, ob die Daten korrekt sind.
>>> from gzip import GzipFile
>>> from StringIO import StringIO
>>> io = StringIO()
>>> io.write('\x1f\x8b\x08\x00\xfd\xcf\x0cX\x02\xff\xcaH\xcd\xc9\xc9W\x00\x00\x00\x00\xff\xff*\xcf/\xcaIQ\x00\x00\x00\x00\xff\xffRT\x04\x00\x00\x00\xff\xff\x03\x00\xb7\xc8\xd4%\x0e\x00\x00\x00')
>>> io.seek(0)
>>> gzfile = GzipFile(fileobj=io, mode='rb')
>>> gzfile.read()
'hello world !!'
Recommended Posts