compression.py (original) (raw)

import sys

import os

from os.path import (abspath, basename, exists, isfile, isdir,

pardir, join, dirname, relpath, realpath)

import tarfile

import zipfile

from contextlib import closing

from pypm.common import console

def unpack(file, path='.'):

"""Extract ``file`` under the directory ``path``

Return (path to the unpacked directory, implementor used)

"""

assert isfile(file)

assert isdir(path)

for implementor in [GzipTarredFile, ZippedFile, Bzip2TarredFile]:

if implementor.is_valid(file):

with console.cd(path):

return [implementor(file).extract(), implementor]

else:

raise InvalidFile, 'compressed file format unknown: %s' % file

def pack_contents(file, d, implementor=None):

"""Pack the contents of directory ``d``"""

d = realpath(d) # avoid symlink confusion in pack_ex

return _pack_ex(file, os.listdir(d), d, implementor)

def pack_single(file, path, implementor=None):

"""Pack a single path (usually a directory)"""

path = realpath(path) # avoid symlink confusion in pack_ex

return _pack_ex(file, [path], dirname(path), implementor)

def _pack_ex(file, names, cwd, implementor=None):

"""Compress the paths given by ``names`` from directory ``cwd``

Compressed file is named ``file`` using ``implementor``

"""

assert isdir(cwd)

if exists(file):

console.rm(file)

if not implementor: implementor = GzipTarredFile

with console.cd(cwd):

relnames = [relpath(name, cwd) for name in names]

implementor.pack(relnames, file)

return file

class CompressedFile(object):

def __init__(self, filename):

self.filename = filename

class ZippedFile(CompressedFile):

"""A zip file"""

@staticmethod

def is_valid(filename):

return zipfile.is_zipfile(filename)

def extract(self):

try:

f = zipfile.ZipFile(self.filename, 'r')

try:

f.extractall()

return _possible_dir_name(f.namelist())

except OSError, e:

if e.errno == 17:

# http://bugs.python.org/issue6510

raise InvalidFile, e

# http://bugs.python.org/issue6609

if sys.platform.startswith('win'):

if isinstance(e, WindowsError) and e.winerror == 267:

raise InvalidFile, ('uses Windows special name (%s)' % e)

raise

finally:

f.close()

except (zipfile.BadZipfile, zipfile.LargeZipFile), e:

raise InvalidFile, e

@classmethod

def pack(cls, paths, file):

raise NotImplementedError, 'pack: zip files not supported yet'

class TarredFile(CompressedFile):

"""A tar.gz/bz2 file"""

@classmethod

def is_valid(cls, filename):

try:

with closing(tarfile.open(filename, cls._get_mode())) as f:

return True

except tarfile.TarError:

return False

def extract(self):

try:

f = tarfile.open(self.filename, self._get_mode())

try:

_ensure_read_write_access(f)

f.extractall()

return _possible_dir_name(f.getnames())

finally:

f.close()

except tarfile.TarError, e:

raise InvalidFile, e

except IOError, e:

# see http://bugs.python.org/issue6584

if 'CRC check failed' in str(e):

raise InvalidFile, e

else:

raise

@classmethod

def pack(cls, paths, file):

f = tarfile.open(file, cls._get_mode('w'))

try:

for path in paths:

assert exists(path), '"%s" does not exist' % path

f.add(path)

finally:

f.close()

def _get_mode(self):

"""Return the mode for this tarfile"""

raise NotImplementedError

class GzipTarredFile(TarredFile):

"""A tar.gz2 file"""

@staticmethod

def _get_mode(mode='r'):

assert mode in ['r', 'w']

return mode + ':gz'

class Bzip2TarredFile(TarredFile):

"""A tar.gz2 file"""

@staticmethod

def _get_mode(mode='r'):

assert mode in ['r', 'w']

return mode + ':bz2'

class InvalidFile(Exception):

"""The given compressed file is invalid. It cannot be properly extracted"""

class MultipleTopLevels(InvalidFile):

"""Can be extracted, but contains multiple top-level dirs"""

class SingleFile(InvalidFile):

"""Contains nothing but a single file. Compressed archived is expected to

contain one directory

"""

def _possible_dir_name(contents):

"""The directory where the the files are possibly extracted."""

top_level_dirs = _find_top_level_directories(contents, sep='/')

if len(top_level_dirs) == 0:

raise InvalidFile, 'has no contents'

elif len(top_level_dirs) > 1:

raise MultipleTopLevels, 'more than one top levels: %s' % top_level_dirs

d = abspath(top_level_dirs[0])

assert exists(d), 'missing dir: %s' % d

if not isdir(d):

# eg: http://pypi.python.org/pypi/DeferArgs/0.4

raise SingleFile, 'contains a single file: %s' % d

return d

def _ensure_read_write_access(tarfileobj):

"""Ensure that the given tarfile will be readable and writable by the

user (the client program using this API) after extraction.

Some tarballs have u-x set on directories or u-w on files. We reset such

perms here.. so that the extracted files remain accessible for reading

and deletion as per the user's wish.

See also: http://bugs.python.org/issue6196

"""

dir_perm = tarfile.TUREAD | tarfile.TUWRITE | tarfile.TUEXEC

file_perm = tarfile.TUREAD | tarfile.TUWRITE

for tarinfo in tarfileobj.getmembers():

tarinfo.mode |= (dir_perm if tarinfo.isdir() else file_perm)

def _find_top_level_directories(fileslist, sep):

"""Find the distinct first components in the fileslist"""

toplevels = set()

for path in fileslist:

firstcomponent = path.split(sep, 1)[0]

toplevels.add(firstcomponent)

return list(toplevels)