cpython: c910af2e3c98 (original) (raw)
Mercurial > cpython
changeset 77635:c910af2e3c98
#4489: Add a shutil.rmtree that isn't suspectible to symlink attacks It is used automatically on platforms supporting the necessary os.openat() and os.unlinkat() functions. Main code by Martin von Löwis. [#4489]
Hynek Schlawack hs@ox.cx | |
---|---|
date | Sat, 23 Jun 2012 17:58:42 +0200 |
parents | 46e29122d3f7 |
children | 93263cd0b7d0 |
files | Doc/library/shutil.rst Lib/shutil.py Lib/test/test_shutil.py Misc/NEWS |
diffstat | 4 files changed, 150 insertions(+), 43 deletions(-)[+] [-] Doc/library/shutil.rst 27 Lib/shutil.py 99 Lib/test/test_shutil.py 63 Misc/NEWS 4 |
line wrap: on
line diff
--- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -190,14 +190,27 @@ Directory and files operations handled by calling a handler specified by onerror or, if that is omitted, they raise an exception.
- .. warning:: +
The default :func:`rmtree` function is susceptible to a symlink attack:[](#l1.9)
given proper timing and circumstances, attackers can use it to delete[](#l1.10)
files they wouldn't be able to access otherwise. Thus -- on platforms[](#l1.11)
that support the necessary fd-based functions :func:`os.openat` and[](#l1.12)
:func:`os.unlinkat` -- a safe version of :func:`rmtree` is used, which[](#l1.13)
isn't vulnerable.[](#l1.14)
+ If onerror is provided, it must be a callable that accepts three
- parameters: function, path, and excinfo. The first parameter,
- function, is the function which raised the exception; it will be
- :func:
os.path.islink
, :func:os.listdir
, :func:os.remove
or - :func:
os.rmdir
. The second parameter, path, will be the path name passed - to function. The third parameter, excinfo, will be the exception
- information return by :func:
sys.exc_info
. Exceptions raised by onerror - will not be caught.
- parameters: function, path, and excinfo. +
- The first parameter, function, is the function which raised the exception;
- it depends on the platform and implementation. The second parameter,
- path, will be the path name passed to function. The third parameter,
- excinfo, will be the exception information returned by
- :func:
sys.exc_info
. Exceptions raised by onerror will not be caught. + - .. versionchanged:: 3.3
Added a safe version that is used automatically if platform supports[](#l1.33)
the fd-based functions :func:`os.openat` and :func:`os.unlinkat`.[](#l1.34)
--- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -337,23 +337,8 @@ def copytree(src, dst, symlinks=False, i raise Error(errors) return dst -def rmtree(path, ignore_errors=False, onerror=None):
- If ignore_errors is set, errors are ignored; otherwise, if onerror
- is set, it is called to handle the error with arguments (func,
- path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
- path is the argument to that function that caused it to fail; and
- exc_info is a tuple returned by sys.exc_info(). If ignore_errors
- is false and onerror is None, an exception is raised.
- """
- if ignore_errors:
def onerror(*args):[](#l2.19)
pass[](#l2.20)
- elif onerror is None:
def onerror(*args):[](#l2.22)
raise[](#l2.23)
+# version vulnerable to race conditions +def _rmtree_unsafe(path, onerror): try: if os.path.islink(path): # symlinks to directories are forbidden, see bug #1669 @@ -374,7 +359,7 @@ def rmtree(path, ignore_errors=False, on except os.error: mode = 0 if stat.S_ISDIR(mode):
rmtree(fullname, ignore_errors, onerror)[](#l2.33)
_rmtree_unsafe(fullname, onerror)[](#l2.34) else:[](#l2.35) try:[](#l2.36) os.remove(fullname)[](#l2.37)
@@ -385,6 +370,84 @@ def rmtree(path, ignore_errors=False, on except os.error: onerror(os.rmdir, path, sys.exc_info()) +# Version using fd-based APIs to protect against races +def _rmtree_safe_fd(topfd, path, onerror):
- names = []
- try:
names = os.flistdir(topfd)[](#l2.46)
- except os.error:
onerror(os.flistdir, path, sys.exc_info())[](#l2.48)
- for name in names:
fullname = os.path.join(path, name)[](#l2.50)
try:[](#l2.51)
orig_st = os.fstatat(topfd, name)[](#l2.52)
mode = orig_st.st_mode[](#l2.53)
except os.error:[](#l2.54)
mode = 0[](#l2.55)
if stat.S_ISDIR(mode):[](#l2.56)
try:[](#l2.57)
dirfd = os.openat(topfd, name, os.O_RDONLY)[](#l2.58)
except os.error:[](#l2.59)
onerror(os.openat, fullname, sys.exc_info())[](#l2.60)
else:[](#l2.61)
try:[](#l2.62)
if os.path.samestat(orig_st, os.fstat(dirfd)):[](#l2.63)
_rmtree_safe_fd(dirfd, fullname, onerror)[](#l2.64)
finally:[](#l2.65)
os.close(dirfd)[](#l2.66)
else:[](#l2.67)
try:[](#l2.68)
os.unlinkat(topfd, name)[](#l2.69)
except os.error:[](#l2.70)
onerror(os.unlinkat, fullname, sys.exc_info())[](#l2.71)
- try:
os.rmdir(path)[](#l2.73)
- except os.error:
onerror(os.rmdir, path, sys.exc_info())[](#l2.75)
+ +_use_fd_functions = hasattr(os, 'openat') and hasattr(os, 'unlinkat') +def rmtree(path, ignore_errors=False, onerror=None):
- If ignore_errors is set, errors are ignored; otherwise, if onerror
- is set, it is called to handle the error with arguments (func,
- path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
- path is the argument to that function that caused it to fail; and
- exc_info is a tuple returned by sys.exc_info(). If ignore_errors
- is false and onerror is None, an exception is raised.
- """
- if ignore_errors:
def onerror(*args):[](#l2.90)
pass[](#l2.91)
- elif onerror is None:
def onerror(*args):[](#l2.93)
raise[](#l2.94)
- if _use_fd_functions:
# Note: To guard against symlink races, we use the standard[](#l2.96)
# lstat()/open()/fstat() trick.[](#l2.97)
try:[](#l2.98)
orig_st = os.lstat(path)[](#l2.99)
except Exception:[](#l2.100)
onerror(os.lstat, path, sys.exc_info())[](#l2.101)
return[](#l2.102)
try:[](#l2.103)
fd = os.open(path, os.O_RDONLY)[](#l2.104)
except Exception:[](#l2.105)
onerror(os.lstat, path, sys.exc_info())[](#l2.106)
return[](#l2.107)
try:[](#l2.108)
if (stat.S_ISDIR(orig_st.st_mode) and[](#l2.109)
os.path.samestat(orig_st, os.fstat(fd))):[](#l2.110)
_rmtree_safe_fd(fd, path, onerror)[](#l2.111)
elif (stat.S_ISREG(orig_st.st_mode)):[](#l2.112)
raise NotADirectoryError(20,[](#l2.113)
"Not a directory: '{}'".format(path))[](#l2.114)
finally:[](#l2.115)
os.close(fd)[](#l2.116)
- else:
return _rmtree_unsafe(path, onerror)[](#l2.118)
+ def _basename(path): # A basename() variant which first strips the trailing slash, if present.
--- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -120,29 +120,36 @@ class TestShutil(unittest.TestCase): def test_on_error(self): self.errorState = 0 os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')[](#l3.7)
support.create_empty_file(self.childpath)[](#l3.8)
self.child_file_path = os.path.join(TESTFN, 'a')[](#l3.9)
self.child_dir_path = os.path.join(TESTFN, 'b')[](#l3.10)
support.create_empty_file(self.child_file_path)[](#l3.11)
os.mkdir(self.child_dir_path)[](#l3.12) old_dir_mode = os.stat(TESTFN).st_mode[](#l3.13)
old_child_mode = os.stat(self.childpath).st_mode[](#l3.14)
old_child_file_mode = os.stat(self.child_file_path).st_mode[](#l3.15)
old_child_dir_mode = os.stat(self.child_dir_path).st_mode[](#l3.16) # Make unwritable.[](#l3.17)
os.chmod(self.childpath, stat.S_IREAD)[](#l3.18)
os.chmod(TESTFN, stat.S_IREAD)[](#l3.19)
new_mode = stat.S_IREAD|stat.S_IEXEC[](#l3.20)
os.chmod(self.child_file_path, new_mode)[](#l3.21)
os.chmod(self.child_dir_path, new_mode)[](#l3.22)
os.chmod(TESTFN, new_mode)[](#l3.23)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) # Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,[](#l3.27)
"Expected call to onerror function did not happen.")[](#l3.28)
self.assertEqual(self.errorState, 3,[](#l3.29)
"Expected call to onerror function did not "[](#l3.30)
"happen.")[](#l3.31)
# Make writable again. os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)[](#l3.35)
os.chmod(self.child_file_path, old_child_file_mode)[](#l3.36)
os.chmod(self.child_dir_path, old_child_dir_mode)[](#l3.37)
# Clean up. shutil.rmtree(TESTFN) def check_args_to_onerror(self, func, arg, exc): # test_rmtree_errors deliberately runs rmtree
# on a directory that is chmod 400, which will fail.[](#l3.44)
# on a directory that is chmod 500, which will fail.[](#l3.45) # This function is run when shutil.rmtree fails.[](#l3.46) # 99.9% of the time it initially fails to remove[](#l3.47) # a file in the directory, so the first time through[](#l3.48)
@@ -151,20 +158,39 @@ class TestShutil(unittest.TestCase): # FUSE experienced a failure earlier in the process # at os.listdir. The first failure may legally # be either.
if self.errorState == 0:[](#l3.53)
if func is os.remove:[](#l3.54)
self.assertEqual(arg, self.childpath)[](#l3.55)
if 0 <= self.errorState < 2:[](#l3.56)
if (func is os.remove or[](#l3.57)
hasattr(os, 'unlinkat') and func is os.unlinkat):[](#l3.58)
self.assertIn(arg, [self.child_file_path, self.child_dir_path])[](#l3.59) else:[](#l3.60)
self.assertIs(func, os.listdir,[](#l3.61)
"func must be either os.remove or os.listdir")[](#l3.62)
self.assertEqual(arg, TESTFN)[](#l3.63)
if self.errorState == 1:[](#l3.64)
self.assertEqual(func, os.rmdir)[](#l3.65)
else:[](#l3.66)
self.assertIs(func, os.listdir, "func must be os.listdir")[](#l3.67)
self.assertIn(arg, [TESTFN, self.child_dir_path])[](#l3.68) self.assertTrue(issubclass(exc[0], OSError))[](#l3.69)
self.errorState = 1[](#l3.70)
self.errorState += 1[](#l3.71) else:[](#l3.72) self.assertEqual(func, os.rmdir)[](#l3.73) self.assertEqual(arg, TESTFN)[](#l3.74) self.assertTrue(issubclass(exc[0], OSError))[](#l3.75)
self.errorState = 2[](#l3.76)
self.errorState = 3[](#l3.77)
- def test_rmtree_does_not_choke_on_failing_lstat(self):
try:[](#l3.80)
orig_lstat = os.lstat[](#l3.81)
def raiser(fn):[](#l3.82)
if fn != TESTFN:[](#l3.83)
raise OSError()[](#l3.84)
else:[](#l3.85)
return orig_lstat(fn)[](#l3.86)
os.lstat = raiser[](#l3.87)
os.mkdir(TESTFN)[](#l3.89)
write_file((TESTFN, 'foo'), 'foo')[](#l3.90)
shutil.rmtree(TESTFN)[](#l3.91)
finally:[](#l3.92)
os.lstat = orig_lstat[](#l3.93)
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') @support.skip_unless_symlink @@ -464,7 +490,7 @@ class TestShutil(unittest.TestCase): # When called on a file instead of a directory, don't delete it. handle, path = tempfile.mkstemp() os.close(handle)
self.assertRaises(OSError, shutil.rmtree, path)[](#l3.101)
self.assertRaises(NotADirectoryError, shutil.rmtree, path)[](#l3.102) os.remove(path)[](#l3.103)
def test_copytree_simple(self): @@ -629,6 +655,7 @@ class TestShutil(unittest.TestCase): os.mkdir(src) os.symlink(src, dst) self.assertRaises(OSError, shutil.rmtree, dst)
shutil.rmtree(dst, ignore_errors=True)[](#l3.110) finally:[](#l3.111) shutil.rmtree(TESTFN, ignore_errors=True)[](#l3.112)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -43,6 +43,10 @@ Core and Builtins Library ------- +- Issue #4489: Add a shutil.rmtree that isn't suspectible to symlink attacks.