shutil模块的使用
阅读原文时间:2023年07月10日阅读:1

  shutil模块

  高级的文件,文件夹,压缩包处理模块

  shutil.copyfileobj(fsrc,fdst,length)  将文件内容拷贝到另外一个文件中,可以部分。fdst目标length长度(长度是每次读取的长度)

  使用方法

def copyfileobj(fsrc, fdst, length=16*1024):
"""copy data from file-like object fsrc to file-like object fdst"""
while 1:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)

  shutil.copyfile(src,dst)  拷贝文件

  使用方法

def copyfile(src, dst, *, follow_symlinks=True):
"""Copy data from src to dst.

If follow\_symlinks is not set and src is a symbolic link, a new  
symlink will be created instead of copying the file it points to.

"""  
if \_samefile(src, dst):  
    raise SameFileError("{!r} and {!r} are the same file".format(src, dst))

for fn in \[src, dst\]:  
    try:  
        st = os.stat(fn)  
    except OSError:  
        # File most likely does not exist  
        pass  
    else:  
        # XXX What about other special files? (sockets, devices...)  
        if stat.S\_ISFIFO(st.st\_mode):  
            raise SpecialFileError("\`%s\` is a named pipe" % fn)

if not follow\_symlinks and os.path.islink(src):  
    os.symlink(os.readlink(src), dst)  
else:  
    with open(src, 'rb') as fsrc:  
        with open(dst, 'wb') as fdst:  
            copyfileobj(fsrc, fdst)  
return dst

  shutil.copymode(src,dst)  仅拷贝权限。内容、组、用户不变

  使用方法

def copymode(src, dst, *, follow_symlinks=True):
"""Copy mode bits from src to dst.

If follow\_symlinks is not set, symlinks aren't followed if and only  
if both \`src\` and \`dst\` are symlinks.  If \`lchmod\` isn't available  
(e.g. Linux) this method does nothing.

"""  
if not follow\_symlinks and os.path.islink(src) and os.path.islink(dst):  
    if hasattr(os, 'lchmod'):  
        stat\_func, chmod\_func = os.lstat, os.lchmod  
    else:  
        return  
elif hasattr(os, 'chmod'):  
    stat\_func, chmod\_func = os.stat, os.chmod  
else:  
    return

st = stat\_func(src)  
chmod\_func(dst, stat.S\_IMODE(st.st\_mode))

if hasattr(os, 'listxattr'):
def _copyxattr(src, dst, *, follow_symlinks=True):
"""Copy extended filesystem attributes from `src` to `dst`.

    Overwrite existing attributes.

    If \`follow\_symlinks\` is false, symlinks won't be followed.

    """

    try:  
        names = os.listxattr(src, follow\_symlinks=follow\_symlinks)  
    except OSError as e:  
        if e.errno not in (errno.ENOTSUP, errno.ENODATA):  
            raise  
        return  
    for name in names:  
        try:  
            value = os.getxattr(src, name, follow\_symlinks=follow\_symlinks)  
            os.setxattr(dst, name, value, follow\_symlinks=follow\_symlinks)  
        except OSError as e:  
            if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA):  
                raise  

else:
def _copyxattr(*args, **kwargs):
pass

  shutil.copystat(src,dst)  拷贝状态的信息,包括:mode bits,atime,mtime,flags

  使用方法

def copystat(src, dst, *, follow_symlinks=True):
"""Copy file metadata

Copy the permission bits, last access time, last modification time, and  
flags from \`src\` to \`dst\`. On Linux, copystat() also copies the "extended  
attributes" where possible. The file contents, owner, and group are  
unaffected. \`src\` and \`dst\` are path names given as strings.

If the optional flag \`follow\_symlinks\` is not set, symlinks aren't  
followed if and only if both \`src\` and \`dst\` are symlinks.  
"""  
def \_nop(\*args, ns=None, follow\_symlinks=None):  
    pass

# follow symlinks (aka don't not follow symlinks)  
follow = follow\_symlinks or not (os.path.islink(src) and os.path.islink(dst))  
if follow:  
    # use the real function if it exists  
    def lookup(name):  
        return getattr(os, name, \_nop)  
else:  
    # use the real function only if it exists  
    # \*and\* it supports follow\_symlinks  
    def lookup(name):  
        fn = getattr(os, name, \_nop)  
        if fn in os.supports\_follow\_symlinks:  
            return fn  
        return \_nop

st = lookup("stat")(src, follow\_symlinks=follow)  
mode = stat.S\_IMODE(st.st\_mode)  
lookup("utime")(dst, ns=(st.st\_atime\_ns, st.st\_mtime\_ns),  
    follow\_symlinks=follow)  
try:  
    lookup("chmod")(dst, mode, follow\_symlinks=follow)  
except NotImplementedError:  
    # if we got a NotImplementedError, it's because  
    #   \* follow\_symlinks=False,  
    #   \* lchown() is unavailable, and  
    #   \* either  
    #       \* fchownat() is unavailable or  
    #       \* fchownat() doesn't implement AT\_SYMLINK\_NOFOLLOW.  
    #         (it returned ENOSUP.)  
    # therefore we're out of options--we simply cannot chown the  
    # symlink.  give up, suppress the error.  
    # (which is what shutil always did in this circumstance.)  
    pass  
if hasattr(st, 'st\_flags'):  
    try:  
        lookup("chflags")(dst, st.st\_flags, follow\_symlinks=follow)  
    except OSError as why:  
        for err in 'EOPNOTSUPP', 'ENOTSUP':  
            if hasattr(errno, err) and why.errno == getattr(errno, err):  
                break  
        else:  
            raise  
\_copyxattr(src, dst, follow\_symlinks=follow)

  shutil.copy(src,dst)  拷贝文件和权限

  使用方法

def copy(src, dst, *, follow_symlinks=True):
"""Copy data and mode bits ("cp src dst"). Return the file's destination.

The destination may be a directory.

If follow\_symlinks is false, symlinks won't be followed. This  
resembles GNU's "cp -P src dst".

If source and destination are the same file, a SameFileError will be  
raised.

"""  
if os.path.isdir(dst):  
    dst = os.path.join(dst, os.path.basename(src))  
copyfile(src, dst, follow\_symlinks=follow\_symlinks)  
copymode(src, dst, follow\_symlinks=follow\_symlinks)  
return dst

  shutil.copy2(src,dst)  拷贝文件和状态信息

  使用方法

def copy2(src, dst, *, follow_symlinks=True):
"""Copy data and metadata. Return the file's destination.

Metadata is copied with copystat(). Please see the copystat function  
for more information.

The destination may be a directory.

If follow\_symlinks is false, symlinks won't be followed. This  
resembles GNU's "cp -P src dst".

"""  
if os.path.isdir(dst):  
    dst = os.path.join(dst, os.path.basename(src))  
copyfile(src, dst, follow\_symlinks=follow\_symlinks)  
copystat(src, dst, follow\_symlinks=follow\_symlinks)  
return dst

  shutil.ignore_patterns(*patterns)

  shutil.copytree(src,dst,syminks=False,ignore=None)  递归的拷贝文件src原文件夹dst拷贝的新文件名称syminks软连接默认不copy;ignore=None 忽略的文件(新文件夹名称如果存在会报错)

  使用方法

  例如:shutil.copytree(src,dst,syminks=False,ignore=shutil.ignore_patterns(*patterns))

def ignore_patterns(*patterns):
"""Function that can be used as copytree() ignore parameter.

Patterns is a sequence of glob-style patterns  
that are used to exclude files"""  
def \_ignore\_patterns(path, names):  
    ignored\_names = \[\]  
    for pattern in patterns:  
        ignored\_names.extend(fnmatch.filter(names, pattern))  
    return set(ignored\_names)  
return \_ignore\_patterns

def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
ignore_dangling_symlinks=False):
"""Recursively copy a directory tree.

The destination directory must not already exist.  
If exception(s) occur, an Error is raised with a list of reasons.

If the optional symlinks flag is true, symbolic links in the  
source tree result in symbolic links in the destination tree; if  
it is false, the contents of the files pointed to by symbolic  
links are copied. If the file pointed by the symlink doesn't  
exist, an exception will be added in the list of errors raised in  
an Error exception at the end of the copy process.

You can set the optional ignore\_dangling\_symlinks flag to true if you  
want to silence this exception. Notice that this has no effect on  
platforms that don't support os.symlink.

The optional ignore argument is a callable. If given, it  
is called with the \`src\` parameter, which is the directory  
being visited by copytree(), and \`names\` which is the list of  
\`src\` contents, as returned by os.listdir():

    callable(src, names) -> ignored\_names

Since copytree() is called recursively, the callable will be  
called once for each directory that is copied. It returns a  
list of names relative to the \`src\` directory that should  
not be copied.

The optional copy\_function argument is a callable that will be used  
to copy each file. It will be called with the source path and the  
destination path as arguments. By default, copy2() is used, but any  
function that supports the same signature (like copy()) can be used.

"""  
names = os.listdir(src)  
if ignore is not None:  
    ignored\_names = ignore(src, names)  
else:  
    ignored\_names = set()

os.makedirs(dst)  
errors = \[\]  
for name in names:  
    if name in ignored\_names:  
        continue  
    srcname = os.path.join(src, name)  
    dstname = os.path.join(dst, name)  
    try:  
        if os.path.islink(srcname):  
            linkto = os.readlink(srcname)  
            if symlinks:  
                # We can't just leave it to \`copy\_function\` because legacy  
                # code with a custom \`copy\_function\` may rely on copytree  
                # doing the right thing.  
                os.symlink(linkto, dstname)  
                copystat(srcname, dstname, follow\_symlinks=not symlinks)  
            else:  
                # ignore dangling symlink if the flag is on  
                if not os.path.exists(linkto) and ignore\_dangling\_symlinks:  
                    continue  
                # otherwise let the copy occurs. copy2 will raise an error  
                if os.path.isdir(srcname):  
                    copytree(srcname, dstname, symlinks, ignore,  
                             copy\_function)  
                else:  
                    copy\_function(srcname, dstname)  
        elif os.path.isdir(srcname):  
            copytree(srcname, dstname, symlinks, ignore, copy\_function)  
        else:  
            # Will raise a SpecialFileError for unsupported file types  
            copy\_function(srcname, dstname)  
    # catch the Error from the recursive copytree so that we can  
    # continue with other files  
    except Error as err:  
        errors.extend(err.args\[0\])  
    except OSError as why:  
        errors.append((srcname, dstname, str(why)))  
try:  
    copystat(src, dst)  
except OSError as why:  
    # Copying file access times may fail on Windows  
    if getattr(why, 'winerror', None) is None:  
        errors.append((src, dst, str(why)))  
if errors:  
    raise Error(errors)  
return dst

version vulnerable to race conditions

  shutil.rmtree(path, ignore_errors=False, onerror=None)  递归的去删除文件(删除所有文件)

  使用方法

def rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.

If ignore\_errors is set, errors are ignored; otherwise, if onerror  
is set, it is called to handle the error with arguments (func,  
path, exc\_info) where func is platform and implementation dependent;  
path is the argument to that function that caused it to fail; and  
exc\_info is a tuple returned by sys.exc\_info().  If ignore\_errors  
is false and onerror is None, an exception is raised.

"""  
if ignore\_errors:  
    def onerror(\*args):  
        pass  
elif onerror is None:  
    def onerror(\*args):  
        raise  
if \_use\_fd\_functions:  
    # While the unsafe rmtree works fine on bytes, the fd based does not.  
    if isinstance(path, bytes):  
        path = os.fsdecode(path)  
    # Note: To guard against symlink races, we use the standard  
    # lstat()/open()/fstat() trick.  
    try:  
        orig\_st = os.lstat(path)  
    except Exception:  
        onerror(os.lstat, path, sys.exc\_info())  
        return  
    try:  
        fd = os.open(path, os.O\_RDONLY)  
    except Exception:  
        onerror(os.lstat, path, sys.exc\_info())  
        return  
    try:  
        if os.path.samestat(orig\_st, os.fstat(fd)):  
            \_rmtree\_safe\_fd(fd, path, onerror)  
            try:  
                os.rmdir(path)  
            except OSError:  
                onerror(os.rmdir, path, sys.exc\_info())  
        else:  
            try:  
                # symlinks to directories are forbidden, see bug #1669  
                raise OSError("Cannot call rmtree on a symbolic link")  
            except OSError:  
                onerror(os.path.islink, path, sys.exc\_info())  
    finally:  
        os.close(fd)  
else:  
    try:  
        if os.path.islink(path):  
            # symlinks to directories are forbidden, see bug #1669  
            raise OSError("Cannot call rmtree on a symbolic link")  
    except OSError:  
        onerror(os.path.islink, path, sys.exc\_info())  
        # can't continue even if onerror hook returns  
        return  
    return \_rmtree\_unsafe(path, onerror)

Allow introspection of whether or not the hardening against symlink

attacks is supported on the current platform

rmtree.avoids_symlink_attacks = _use_fd_functions

  shutil.move(src,dst)  递归的移动文件(类似于重命名)

  使用方法

def move(src, dst, copy_function=copy2):
"""Recursively move a file or directory to another location. This is
similar to the Unix "mv" command. Return the file or directory's
destination.

If the destination is a directory or a symlink to a directory, the source  
is moved inside the directory. The destination path must not already  
exist.

If the destination already exists but is not a directory, it may be  
overwritten depending on os.rename() semantics.

If the destination is on our current filesystem, then rename() is used.  
Otherwise, src is copied to the destination and then removed. Symlinks are  
recreated under the new name if os.rename() fails because of cross  
filesystem renames.

The optional \`copy\_function\` argument is a callable that will be used  
to copy the source or it will be delegated to \`copytree\`.  
By default, copy2() is used, but any function that supports the same  
signature (like copy()) can be used.

A lot more could be done here...  A look at a mv.c shows a lot of  
the issues this implementation glosses over.

"""  
real\_dst = dst  
if os.path.isdir(dst):  
    if \_samefile(src, dst):  
        # We might be on a case insensitive filesystem,  
        # perform the rename anyway.  
        os.rename(src, dst)  
        return

    real\_dst = os.path.join(dst, \_basename(src))  
    if os.path.exists(real\_dst):  
        raise Error("Destination path '%s' already exists" % real\_dst)  
try:  
    os.rename(src, real\_dst)  
except OSError:  
    if os.path.islink(src):  
        linkto = os.readlink(src)  
        os.symlink(linkto, real\_dst)  
        os.unlink(src)  
    elif os.path.isdir(src):  
        if \_destinsrc(src, dst):  
            raise Error("Cannot move a directory '%s' into itself"  
                        " '%s'." % (src, dst))  
        copytree(src, real\_dst, copy\_function=copy\_function,  
                 symlinks=True)  
        rmtree(src)  
    else:  
        copy\_function(src, real\_dst)  
        os.unlink(src)  
return real\_dst

  shutil.make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,dry_run=0, owner=None, group=None, logger=None)

  base_name:压缩包的文件名,也可以是压缩包的路径,只是文件名时,则保存至当前目录,否则保存至指定路径,

  如:wenjianming    ==>保存至当前路径

  如:/users/kevin/wenjianming  ==>保存至/users/kevin/

  format:压缩包种类,“zip”,"tar",“bztar”,"gztar"

  root_dir:要压缩的文件夹路径(默认当前目录)

  owner=None, group=None, logger=None  用户(默认当前),组(默认当前),用于记录日志,通常是logging.Logger对象

  使用方法

def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
dry_run=0, owner=None, group=None, logger=None):
"""Create an archive file (eg. zip or tar).

'base\_name' is the name of the file to create, minus any format-specific  
extension; 'format' is the archive format: one of "zip", "tar", "gztar",  
"bztar", or "xztar".  Or any other registered format.

'root\_dir' is a directory that will be the root directory of the  
archive; ie. we typically chdir into 'root\_dir' before creating the  
archive.  'base\_dir' is the directory where we start archiving from;  
ie. 'base\_dir' will be the common prefix of all files and  
directories in the archive.  'root\_dir' and 'base\_dir' both default  
to the current directory.  Returns the name of the archive file.

'owner' and 'group' are used when creating a tar archive. By default,  
uses the current owner and group.  
"""  
save\_cwd = os.getcwd()  
if root\_dir is not None:  
    if logger is not None:  
        logger.debug("changing into '%s'", root\_dir)  
    base\_name = os.path.abspath(base\_name)  
    if not dry\_run:  
        os.chdir(root\_dir)

if base\_dir is None:  
    base\_dir = os.curdir

kwargs = {'dry\_run': dry\_run, 'logger': logger}

try:  
    format\_info = \_ARCHIVE\_FORMATS\[format\]  
except KeyError:  
    raise ValueError("unknown archive format '%s'" % format) from None

func = format\_info\[0\]  
for arg, val in format\_info\[1\]:  
    kwargs\[arg\] = val

if format != 'zip':  
    kwargs\['owner'\] = owner  
    kwargs\['group'\] = group

try:  
    filename = func(base\_name, base\_dir, \*\*kwargs)  
finally:  
    if root\_dir is not None:  
        if logger is not None:  
            logger.debug("changing back to '%s'", save\_cwd)  
        os.chdir(save\_cwd)

return filename

  shutil 对压缩包的处理是调用了ZipFile和TarFile两个模块来进行的。

  压缩(当前的)

import zipfile

压缩

z = zipfile.ZipFile('kevin.zip', 'w')
z.write('a.log')
z.write('data.data')
z.close()

解压

z = zipfile.ZipFile('kevin.zip', 'r')
z.extractall()
z.close()

  打包(只打包不压缩(所有的))

import tarfile

压缩

tar = tarfile.open('kevin.tar', 'w')
tar.add('/users/Kevin/Pycharmprojects/kevin.zip', arcname=kevin.zip)
tar.add('/users/Kevin/Pycharmprojects/roc.zip', arcname=roc.zip)
tar.close()

解压

tar = tarfile.open('kevin.tar', 'r')
tar.extractall() # 可设置解压地址
tar.close()