示例:
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.request import Request
from urllib.request import quote
import json
import math
import os
class DownLoader(object):
def __init__(self):
self.part_size = 1024 * 1024 * 10 # 分块下载大小
self.part_thread_num = 10
self.BUFFER_SIZE = 64 * 1024
def download\_part(self, encode\_url, part\_filename, offset, end\_bytes):
"""
:param encode\_url:经过URL编码的网络地址
:param part\_filename: 文件块儿名字
:param offset: 下载字节起始点(包含)
:param end\_bytes: 下载字节结束点(包含)
:return: (下载结果)
"""
# 构造请求头
range\_header = {
'Range': 'bytes=%s-%s' % (offset, end\_bytes)
}
print(range\_header)
cur\_task\_ret = False
expected\_file\_size = end\_bytes - offset + 1
part\_req = Request(encode\_url,headers=range\_header)
with open(part\_filename, 'wb') as local\_part\_fd:
with urlopen(part\_req) as req\_fd:
while True:
# 一直从网络读数据
data = req\_fd.read(self.BUFFER\_SIZE)
if not data:
break
local\_part\_fd.write(data)
if expected\_file\_size == os.stat(part\_filename).st\_size:
print('%s 与预期块儿文件大小相符' % part\_filename)
cur\_task\_ret = True
# break
else:
print('%s 与预期块儿文件大小 不符,预期%s字节,实际得到%s 字节' % (
part\_filename, expected\_file\_size, os.stat(part\_filename).st\_size))
return {part\_filename: cur\_task\_ret}
def download(self, url):
finally\_filename = os.path.basename(url)
# 将URL编码成%字符串格式
encode\_url = quote(url, safe=";/?:@&=+$,")
print(encode\_url)
# 构造请求
req = Request(encode\_url)
# 发起请求并且获取内容长度
with urlopen(req) as fp:
# print(json.dumps(dir(fp),indent=1))
print(fp.getheaders())
# length = fp.getheader('content-Range')
length = fp.getheader('Content-Length')
length = int(length)
print(type(length))
print('length:', length)
# 分块任务列表
thread\_list = \[\]
# 每个块儿下载的结果
multi\_chunk\_download\_result = {}
chunk\_size = self.part\_size
# 计算需要下载的块儿个数
chunk\_count = int(math.ceil(length / float(chunk\_size)))
pool\_args\_list = \[\]
# 计算每个块儿请求的字节范围
for i in range(chunk\_count):
offset = chunk\_size \* i
end\_bytes = min(chunk\_size \* (i + 1), length) - 1
# 将一个文件划分的所有块儿任务,添加到任务列表
part\_num = i + 1
part\_filename = finally\_filename + '.' + str(part\_num)
# 每个块儿请求的范围,块儿名字,加到线程参数列表
pool\_args\_list.append((encode\_url, part\_filename, offset, end\_bytes))
# \*\*\*\*\*\*\*\*开始多线程下载数据,并获取下载结果\*\*\*\*\*\*\*\*\*\*\*\*\*\*
# 构建线程池实例
tp = ThreadPoolExecutor(max\_workers=self.part\_thread\_num)
# 全部添加到任务队列开始处理
\[thread\_list.append(tp.submit(self.download\_part, \*args)) for args in pool\_args\_list\]
# 等待所有线程结束,获取全部线程的执行结果
\[multi\_chunk\_download\_result.update(part\_thread.result()) for part\_thread in as\_completed(thread\_list)\]
# 下载总结
print('下载总结')
# 如果任务数和块儿数对不上,报一下出入
if len(multi\_chunk\_download\_result) != chunk\_count:
raise RuntimeError(
"%s part miss,expect=%d,actual=%d" % (finally\_filename, chunk\_count, len(multi\_chunk\_download\_result)))
# 如果任务都完毕,检查是否有失败的块儿
for item in multi\_chunk\_download\_result.keys():
if not multi\_chunk\_download\_result\[item\]:
raise RuntimeError("%s part upload has fail" % item)
# 都OK 整合文件
with open(finally\_filename, 'wb') as local\_fd:
for i in range(chunk\_count):
part\_filename = finally\_filename + '.' + str(i + 1)
with open(part\_filename, 'rb') as part\_fd:
while True:
bytes\_data = part\_fd.read(self.BUFFER\_SIZE)
if not bytes\_data:
break
local\_fd.write(bytes\_data)
if length == os.stat(finally\_filename).st\_size:
print('%s 下载完成,文件大小相符' % finally\_filename)
for part\_filename in multi\_chunk\_download\_result.keys():
os.remove(part\_filename)
else:
print('%s 下载完成,但大小不符,content\_length:%s 下载后大小 %s' % (finally\_filename, length,os.stat(finally\_filename).st\_size ))
if __name__ == '__main__':
downloader = DownLoader()
url = 'https://ks3-cn-beijing.ksyun.com/zhangmingda/111-3333333.Python安装与命令行操作.mp4'
print(url)
downloader.download(url)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章