cpython: f296d7d82675 (original) (raw)
--- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -50,8 +50,8 @@ class BufferedSubFile(object): simple abstraction -- it parses until EOF closes the current message. """ def init(self):
# The last partial line pushed into this object.[](#l1.7)
self._partial = ''[](#l1.8)
# Chunks of the last partial line pushed into this object.[](#l1.9)
self._partial = [][](#l1.10) # The list of full, pushed lines, in reverse order[](#l1.11) self._lines = [][](#l1.12) # The stack of false-EOF checking predicates.[](#l1.13)
@@ -67,8 +67,8 @@ class BufferedSubFile(object): def close(self): # Don't forget any trailing partial line.
self._lines.append(self._partial)[](#l1.18)
self._partial = ''[](#l1.19)
self.pushlines(''.join(self._partial).splitlines(True))[](#l1.20)
self._partial = [][](#l1.21) self._closed = True[](#l1.22)
def readline(self): @@ -96,16 +96,26 @@ class BufferedSubFile(object): def push(self, data): """Push some new data into this object."""
# Handle any previous leftovers[](#l1.29)
data, self._partial = self._partial + data, ''[](#l1.30) # Crack into lines, but preserve the linesep characters on the end of each[](#l1.31) parts = data.splitlines(True)[](#l1.32)
if not parts or not parts[0].endswith(('\n', '\r')):[](#l1.34)
# No new complete lines, so just accumulate partials[](#l1.35)
self._partial += parts[](#l1.36)
return[](#l1.37)
if self._partial:[](#l1.39)
# If there are previous leftovers, complete them now[](#l1.40)
self._partial.append(parts[0])[](#l1.41)
parts[0:1] = ''.join(self._partial).splitlines(True)[](#l1.42)
del self._partial[:][](#l1.43)
+ # If the last element of the list does not end in a newline, then treat # it as a partial line. We only check for '\n' here because a line # ending with '\r' might be a line that was split in the middle of a # '\r\n' sequence (see bugs 1555570 and 1721862).
if parts and not parts[-1].endswith('\n'):[](#l1.49)
self._partial = parts.pop()[](#l1.50)
if not parts[-1].endswith('\n'):[](#l1.51)
self._partial = [parts.pop()][](#l1.52) self.pushlines(parts)[](#l1.53)
--- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -10,6 +10,7 @@ import textwrap from io import StringIO, BytesIO from itertools import chain +from random import choice import email import email.policy @@ -3353,16 +3354,70 @@ Do you like this message? bsf.push(il) nt += n n1 = 0
while True:[](#l2.15)
ol = bsf.readline()[](#l2.16)
if ol == NeedMoreData:[](#l2.17)
break[](#l2.18)
for ol in iter(bsf.readline, NeedMoreData):[](#l2.19) om.append(ol)[](#l2.20) n1 += 1[](#l2.21) self.assertEqual(n, n1)[](#l2.22) self.assertEqual(len(om), nt)[](#l2.23) self.assertEqual(''.join([il for il, n in imt]), ''.join(om))[](#l2.24)
n = 10000[](#l2.29)
chunksize = 5[](#l2.30)
chars = 'abcd \t\r\n'[](#l2.31)
s = ''.join(choice(chars) for i in range(n)) + '\n'[](#l2.33)
target = s.splitlines(True)[](#l2.34)
bsf = BufferedSubFile()[](#l2.36)
lines = [][](#l2.37)
for i in range(0, len(s), chunksize):[](#l2.38)
chunk = s[i:i+chunksize][](#l2.39)
bsf.push(chunk)[](#l2.40)
lines.extend(iter(bsf.readline, NeedMoreData))[](#l2.41)
self.assertEqual(lines, target)[](#l2.42)
+ + +class TestFeedParsers(TestEmailBase): +
- def parse(self, chunks):
from email.feedparser import FeedParser[](#l2.48)
feedparser = FeedParser()[](#l2.49)
for chunk in chunks:[](#l2.50)
feedparser.feed(chunk)[](#l2.51)
return feedparser.close()[](#l2.52)
- def test_newlines(self):
m = self.parse(['a:\nb:\rc:\r\nd:\n'])[](#l2.55)
self.assertEqual(m.keys(), ['a', 'b', 'c', 'd'])[](#l2.56)
m = self.parse(['a:\nb:\rc:\r\nd:'])[](#l2.57)
self.assertEqual(m.keys(), ['a', 'b', 'c', 'd'])[](#l2.58)
m = self.parse(['a:\rb', 'c:\n'])[](#l2.59)
self.assertEqual(m.keys(), ['a', 'bc'])[](#l2.60)
m = self.parse(['a:\r', 'b:\n'])[](#l2.61)
self.assertEqual(m.keys(), ['a', 'b'])[](#l2.62)
m = self.parse(['a:\r', '\nb:\n'])[](#l2.63)
self.assertEqual(m.keys(), ['a', 'b'])[](#l2.64)
m = self.parse(['a:\x85b:\u2028c:\n'])[](#l2.65)
self.assertEqual(m.items(), [('a', '\x85'), ('b', '\u2028'), ('c', '')])[](#l2.66)
m = self.parse(['a:\r', 'b:\x85', 'c:\n'])[](#l2.67)
self.assertEqual(m.items(), [('a', ''), ('b', '\x85'), ('c', '')])[](#l2.68)
- def test_long_lines(self):
M, N = 1000, 100000[](#l2.71)
m = self.parse(['a:b\n\n'] + ['x'*M] * N)[](#l2.72)
self.assertEqual(m.items(), [('a', 'b')])[](#l2.73)
self.assertEqual(m.get_payload(), 'x'*M*N)[](#l2.74)
m = self.parse(['a:b\r\r'] + ['x'*M] * N)[](#l2.75)
self.assertEqual(m.items(), [('a', 'b')])[](#l2.76)
self.assertEqual(m.get_payload(), 'x'*M*N)[](#l2.77)
m = self.parse(['a:b\r\r'] + ['x'*M+'\x85'] * N)[](#l2.78)
self.assertEqual(m.items(), [('a', 'b')])[](#l2.79)
self.assertEqual(m.get_payload(), ('x'*M+'\x85')*N)[](#l2.80)
m = self.parse(['a:\r', 'b: '] + ['x'*M] * N)[](#l2.81)
self.assertEqual(m.items(), [('a', ''), ('b', 'x'*M*N)])[](#l2.82)
class TestParsers(TestEmailBase):
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -115,6 +115,9 @@ Core and Builtins Library ------- +- Issue #21448: Changed FeedParser feed() to avoid O(N**2) behavior when