python的零碎知识
阅读原文时间:2023年07月10日阅读:2

1.Python代码操作git

  • 安装

    pip3 install gitpython
  • 操作git

    import os
    from git.repo import Repo # gitpython
    
    def clone():
        download_path = os.path.join('codes', 'fuck')
    # git clone -b master https://gitee.com/wupeiqi/xxoo.git
    # git clone -b v1 https://gitee.com/wupeiqi/xxoo.git
    Repo.clone_from('https://gitee.com/wupeiqi/xxoo.git', to_path=download_path, branch='master')
    def pull(): # git pull origin master local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) repo.git.pull() def tag_list(): local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) tag_list = [item.name for item in repo.tags] print(tag_list) def commits_list(): import json local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',date='format:%Y-%m-%d %H:%M',max_count=50)
    commit_list = [json.loads(item) for item in commit_log.split('\n') ]
    print(commit_list)
    def branches_list(): pull() local_path = os.path.join('codes', 'fuck') repo = Repo(local_path)
    branches = repo.remote().refs
    branch_list = [item.remote_head for item in branches if item.remote_head != "HEAD"]
    def checkout_to_branch(branch='dev'): local_path = os.path.join('codes', 'fuck') repo = Repo(local_path)
    # before = repo.git.branch()
    # print('当前所在分支:',before)
    
    repo.git.checkout(branch)
    
    # after = repo.git.branch()
    # print('当前所在分支:',after)
    def checkout_to_commit(commit='ec1d728'): # commits_list() local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) repo.git.reset('--hard', commit)
  • 封装到一个类中,以后当做工具。

    import os
    import json
    from git.repo import Repo
    from git.repo.fun import is_git_dir
    
    class GitRepository(object):
        """
        git仓库管理
        """
    def __init__(self, local_path, repo_url, branch='master'):
        #本地存储代码的路径 E:\Django_issue\code_issue\codes\xxx
        self.local_path = local_path
        #远程仓库地址 https://gite.com/zhanyu/xxx.git
        self.repo_url = repo_url
        # 这个默认为空下边自动赋值
        self.repo = None
        # 调用下边的方法
        self.initial(branch)
    
    def initial(self,branch):
        """
        初始化git仓库
        :param repo_url:
        :param branch:
        :return:
        """
        # 判断本地的仓库中有没有文件,没有就创建
        if not os.path.exists(self.local_path):
            os.makedirs(self.local_path)
        # codes/luffycity/.git
        git_local_path = os.path.join(self.local_path, '.git')
        # 用来判断是不是有这个文件
        if not is_git_dir(git_local_path):
            # 第一次拉文件的时候是需要克隆的,不能直接拉取
            self.repo = Repo.clone_from(self.repo_url, to_path=self.local_path, branch=branch)
        else:
            # 如果有这个.git文件就实例化Repo这个类;
            self.repo = Repo(self.local_path)
    
    def pull(self):
        """
        从线上拉最新代码
        :return:
        """
        self.repo.git.pull()
    
    def branches(self):
        """
        获取所有分支
        :return:
        """
        branches = self.repo.remote().refs
        return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
    
    def commits(self):
        """
        获取所有提交记录
        :return:
        """
        commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
                                       max_count=50,
                                       date='format:%Y-%m-%d %H:%M')
        return [json.loads(item) for item in commit_log.split('\n') ]
    
    def tags(self):
        """
        获取所有tag
        :return:
        """
        return [tag.name for tag in self.repo.tags]
    
    def change_to_branch(self, branch):
        """
        切换分值
        :param branch:
        :return:
        """
        self.repo.git.checkout(branch)
    
    def change_to_commit(self, branch, commit):
        """
        切换commit
        :param branch:
        :param commit:
        :return:
        """
        self.change_to_branch(branch=branch)
        self.repo.git.reset('--hard', commit)
    
    def change_to_tag(self, tag):
        """
        切换tag
        :param tag:
        :return:
        """
      self.repo.git.checkout(tag)
    if __name__ == '__main__': local_path = os.path.join('codes', 'luffycity') repo_object = GitRepository(local_path, 'https://gitee.com/wupeiqi/xxoo.git')

2.解压缩文件

import shutil

# 压缩文件: py2、py3
"""
abs_file_path = shutil.make_archive(
    base_name="files/ww",  # 压缩包文件路劲
    format='tar',  # “zip”, “tar”
    root_dir='codes/luffycity'  # 被压缩的文件目录
)
print(abs_file_path)
"""

# 解压缩:py3
# shutil._unpack_zipfile('files/ww.zip', 'xxxxxx/new')
# shutil._unpack_tarfile('files/ww.zip', 'xxxxxx/new')

# 解压缩:py2/py3
"""
import zipfile
z = zipfile.ZipFile('files/ww.zip', 'r')
z.extractall(path='xxxxxx/luffy')
z.close()

import tarfile
tar = tarfile.TarFile('code/www.tar', 'r')
tar.extractall(path='/code/x1/')  # 可设置解压地址
tar.close()
"""

3.基于paramiko操作远程服务器

import paramiko

class SSHProxy(object):

    def __init__(self, hostname, port, username, private_key_path):
        self.hostname = hostname
        self.port = port
        self.username = username
        self.private_key_path = private_key_path

        self.transport = None

    def open(self):
        private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path)
        self.transport = paramiko.Transport((self.hostname, self.port))
        self.transport.connect(username=self.username, pkey=private_key)

    def close(self):
        self.transport.close()

    def command(self, cmd):
        ssh = paramiko.SSHClient()
        ssh._transport = self.transport
        stdin, stdout, stderr = ssh.exec_command(cmd)
        result = stdout.read()
        ssh.close()
        return result

    def upload(self, local_path, remote_path):
        sftp = paramiko.SFTPClient.from_transport(self.transport)
        sftp.put(local_path, remote_path)
        sftp.close()

    def __enter__(self):
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

if __name__ == '__main__':
    with SSHProxy('10.211.55.25', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
        proxy.upload('xx','xx')
        proxy.command('ifconfig')
        proxy.command('ifconfig')
        proxy.upload('xx', 'xx')
    with SSHProxy('10.211.55.26', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
        proxy.upload('xx','xx')
        proxy.command('ifconfig')
        proxy.command('ifconfig')
        proxy.upload('xx', 'xx')

4.本地执行命令

import subprocess

result = subprocess.check_output('dir', cwd='D:\wupeiqi\code\ziwen\codes', shell=True)
print(result.decode('gbk'), type(result))