bpo-22367: Update test_fcntl.py for spawn process mode (#17154) · python/cpython@9960230 (original) (raw)
`@@ -51,6 +51,21 @@ def init(self, fn):
`
51
51
`def fileno(self):
`
52
52
`return self.fn
`
53
53
``
``
54
`+
def try_lockf_on_other_process_fail(fname, cmd):
`
``
55
`+
f = open(fname, 'wb+')
`
``
56
`+
try:
`
``
57
`+
fcntl.lockf(f, cmd)
`
``
58
`+
except BlockingIOError:
`
``
59
`+
pass
`
``
60
`+
finally:
`
``
61
`+
f.close()
`
``
62
+
``
63
`+
def try_lockf_on_other_process(fname, cmd):
`
``
64
`+
f = open(fname, 'wb+')
`
``
65
`+
fcntl.lockf(f, cmd)
`
``
66
`+
fcntl.lockf(f, fcntl.LOCK_UN)
`
``
67
`+
f.close()
`
``
68
+
54
69
`class TestFcntl(unittest.TestCase):
`
55
70
``
56
71
`def setUp(self):
`
`@@ -138,28 +153,23 @@ def test_flock(self):
`
138
153
`self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
`
139
154
`self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
`
140
155
``
``
156
`+
@unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
`
141
157
`def test_lockf_exclusive(self):
`
142
158
`self.f = open(TESTFN, 'wb+')
`
143
159
`cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
`
144
``
`-
def try_lockf_on_other_process():
`
145
``
`-
self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd)
`
146
``
-
147
160
`fcntl.lockf(self.f, cmd)
`
148
``
`-
p = Process(target=try_lockf_on_other_process)
`
``
161
`+
p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
`
149
162
`p.start()
`
150
163
`p.join()
`
151
164
`fcntl.lockf(self.f, fcntl.LOCK_UN)
`
152
165
`self.assertEqual(p.exitcode, 0)
`
153
166
``
``
167
`+
@unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
`
154
168
`def test_lockf_share(self):
`
155
169
`self.f = open(TESTFN, 'wb+')
`
156
170
`cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
`
157
``
`-
def try_lockf_on_other_process():
`
158
``
`-
fcntl.lockf(self.f, cmd)
`
159
``
`-
fcntl.lockf(self.f, fcntl.LOCK_UN)
`
160
``
-
161
171
`fcntl.lockf(self.f, cmd)
`
162
``
`-
p = Process(target=try_lockf_on_other_process)
`
``
172
`+
p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
`
163
173
`p.start()
`
164
174
`p.join()
`
165
175
`fcntl.lockf(self.f, fcntl.LOCK_UN)
`