安装
pip3 install gitpython
操作git
import os
from git.repo import Repo # gitpython
def clone():
download_path = os.path.join('codes', 'fuck')# git clone -b master https://gitee.com/wupeiqi/xxoo.git
# git clone -b v1 https://gitee.com/wupeiqi/xxoo.git
Repo.clone_from('https://gitee.com/wupeiqi/xxoo.git', to_path=download_path, branch='master')
def pull():
# git pull origin master
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)
repo.git.pull()
def tag_list():
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)
tag_list = [item.name for item in repo.tags]
print(tag_list)
def commits_list():
import json
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)
commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',date='format:%Y-%m-%d %H:%M',max_count=50)commit_list = [json.loads(item) for item in commit_log.split('\n') ]
print(commit_list)
def branches_list():
pull()
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)branches = repo.remote().refs
branch_list = [item.remote_head for item in branches if item.remote_head != "HEAD"]
def checkout_to_branch(branch='dev'):
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)# before = repo.git.branch()
# print('当前所在分支:',before)
repo.git.checkout(branch)
# after = repo.git.branch()
# print('当前所在分支:',after)
def checkout_to_commit(commit='ec1d728'):
# commits_list()
local_path = os.path.join('codes', 'fuck')
repo = Repo(local_path)
repo.git.reset('--hard', commit)
封装到一个类中,以后当做工具。
import os
import json
from git.repo import Repo
from git.repo.fun import is_git_dir
class GitRepository(object):
"""
git仓库管理
"""def __init__(self, local_path, repo_url, branch='master'):
#本地存储代码的路径 E:\Django_issue\code_issue\codes\xxx
self.local_path = local_path
#远程仓库地址 https://gite.com/zhanyu/xxx.git
self.repo_url = repo_url
# 这个默认为空下边自动赋值
self.repo = None
# 调用下边的方法
self.initial(branch)
def initial(self,branch):
"""
初始化git仓库
:param repo_url:
:param branch:
:return:
"""
# 判断本地的仓库中有没有文件,没有就创建
if not os.path.exists(self.local_path):
os.makedirs(self.local_path)
# codes/luffycity/.git
git_local_path = os.path.join(self.local_path, '.git')
# 用来判断是不是有这个文件
if not is_git_dir(git_local_path):
# 第一次拉文件的时候是需要克隆的,不能直接拉取
self.repo = Repo.clone_from(self.repo_url, to_path=self.local_path, branch=branch)
else:
# 如果有这个.git文件就实例化Repo这个类;
self.repo = Repo(self.local_path)
def pull(self):
"""
从线上拉最新代码
:return:
"""
self.repo.git.pull()
def branches(self):
"""
获取所有分支
:return:
"""
branches = self.repo.remote().refs
return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
def commits(self):
"""
获取所有提交记录
:return:
"""
commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
max_count=50,
date='format:%Y-%m-%d %H:%M')
return [json.loads(item) for item in commit_log.split('\n') ]
def tags(self):
"""
获取所有tag
:return:
"""
return [tag.name for tag in self.repo.tags]
def change_to_branch(self, branch):
"""
切换分值
:param branch:
:return:
"""
self.repo.git.checkout(branch)
def change_to_commit(self, branch, commit):
"""
切换commit
:param branch:
:param commit:
:return:
"""
self.change_to_branch(branch=branch)
self.repo.git.reset('--hard', commit)
def change_to_tag(self, tag):
"""
切换tag
:param tag:
:return:
"""
self.repo.git.checkout(tag)
if __name__ == '__main__':
local_path = os.path.join('codes', 'luffycity')
repo_object = GitRepository(local_path, 'https://gitee.com/wupeiqi/xxoo.git')
import shutil
# 压缩文件: py2、py3
"""
abs_file_path = shutil.make_archive(
base_name="files/ww", # 压缩包文件路劲
format='tar', # “zip”, “tar”
root_dir='codes/luffycity' # 被压缩的文件目录
)
print(abs_file_path)
"""
# 解压缩:py3
# shutil._unpack_zipfile('files/ww.zip', 'xxxxxx/new')
# shutil._unpack_tarfile('files/ww.zip', 'xxxxxx/new')
# 解压缩:py2/py3
"""
import zipfile
z = zipfile.ZipFile('files/ww.zip', 'r')
z.extractall(path='xxxxxx/luffy')
z.close()
import tarfile
tar = tarfile.TarFile('code/www.tar', 'r')
tar.extractall(path='/code/x1/') # 可设置解压地址
tar.close()
"""
import paramiko
class SSHProxy(object):
def __init__(self, hostname, port, username, private_key_path):
self.hostname = hostname
self.port = port
self.username = username
self.private_key_path = private_key_path
self.transport = None
def open(self):
private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path)
self.transport = paramiko.Transport((self.hostname, self.port))
self.transport.connect(username=self.username, pkey=private_key)
def close(self):
self.transport.close()
def command(self, cmd):
ssh = paramiko.SSHClient()
ssh._transport = self.transport
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh.close()
return result
def upload(self, local_path, remote_path):
sftp = paramiko.SFTPClient.from_transport(self.transport)
sftp.put(local_path, remote_path)
sftp.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
with SSHProxy('10.211.55.25', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
proxy.upload('xx','xx')
proxy.command('ifconfig')
proxy.command('ifconfig')
proxy.upload('xx', 'xx')
with SSHProxy('10.211.55.26', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
proxy.upload('xx','xx')
proxy.command('ifconfig')
proxy.command('ifconfig')
proxy.upload('xx', 'xx')
import subprocess
result = subprocess.check_output('dir', cwd='D:\wupeiqi\code\ziwen\codes', shell=True)
print(result.decode('gbk'), type(result))
手机扫一扫
移动阅读更方便
你可能感兴趣的文章