同步与异步 multiprocessing 进程对象多种方法
阅读原文时间:2023年07月10日阅读:3

目录

同步与异步

用来表达任务的提交方式

同步:

提交完任务之后原地等待任务的返回结果 期间不做任何事

异步:

提交完任务之后不愿地等待任务的返回结果 直接去做其他事 有结果自动通知

用来表达任务的执行状态

阻塞

程序处于阻塞态

非阻塞

程序处于就绪态、运行态

  1. 同步阻塞

    提交任务之后 cpu走了 进程不执行了

  2. 同步非阻塞

    在原地做一些事情

  3. 异步阻塞

    cpu没来做不了事情

  4. 异步非阻塞(效率最高)

    提交完任务之后 进程还继续 cpu也还在

异步非阻塞框架(写游戏、响应速度、效率高)

创建进程的多种方式

# 1.创建进程的步骤
如何创建进程?用鼠标双击一个桌面图标,就创建了一个应用程序的进程。
而我们可以用python代码创建进程,这其中至少有这样的过程:
1.硬盘中存放python代码
2.读取代码到内存
3.cpu执行代码
4.操作系统创建新进程

# 2.不同操作系统的差异(重要)
因为进程是由操作系统创建的,所以根据不同操作系统也会有差异:
"""
在不同的操作系统中创建进程底层原理不一样
    windows
        以导入模块的形式创建进程(需要使用if __name__ == 'main')
    linux/mac
        以拷贝代码的形式创建进程 (复制的时候不包含创建进程的代码)
"""

# 3.父进程和子进程
比如一个浏览器,可能会有很多分页。那相对的来说浏览器就是主进程,一个页面就是一个子进程。
一个py文件,里面写了创建进程的代码。那这个py文件运行之后,首先他自己是一个进程(父进程),被他创建出来的进程是他的子进程。
父进程和子进程是相对的概念,比如刚刚所说的py文件,又是pycharm的子进程。

在windows系统下使用Python模块multiprocessing模块创建进程:

from multiprocessing import Process
import time

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

p = Process(target=task)  # 使用Process模块创建一个子进程 在子进程运行task函数
p.start()  # 运行子进程

print('主进程运行结束')

# 执行后会产生如下报错信息:
'''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

Process finished with exit code 0
'''

为什么会产生这样的报错呢?

windows以导入模块的形式创建进程。

所以产生报错的就是如下两行代码:

解决:

from multiprocessing import Process
import time

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
'''
将这两行代码放入if __name__ == 'main':
使主进程作为模块导入的时候,不再执行这两行代码,就不会出现问题了。
'''

展现异步

展现异步之前,先举一个同步的例子:

import time

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    task()   # 程序会跳回去执行task函数,task没执行完就不会执行下面的print()
    print('主进程运行结束')

'''输出结果:
子进程开始执行
子进程执行结束
主进程运行结束
'''

展现异步:

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主进程运行结束') # 这行代码执行最快 主进程真的结束了吗? =。=

'''输出结果:
主进程运行结束
子进程开始执行
子进程执行结束
'''

为什么是这样的输出结果?

p.start()执行完后,相当于向操作系统发送了一个信息,要创建一个子进程。操作系统要处理信息是需要一定时间的。主进程只是发了一个信息就完了,代码继续往下走,自然会执行print。等操作系统反应过来了,子进程才会开始执行,而这个速度是慢于主进程代码执行速度的。也就是说: 操作系统开辟内存空间的时候 主进程不停着,直接运行下一行。

请解释输出结果:

请解释输出结果:

创建进程的方式(一):使用Process()创建进程对象

基本使用

import time
from multiprocessing import Process

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    p = Process(target=task)
    print(p, type(p))  # <Process(Process-1, initial)> <class 'multiprocessing.context.Process'>
    p.start()
# 用Process创建一个子进程对象 然后调用子进程对象的start()方法运行子进程。

查看Process源码发现他继承BaseProcess类:

BaseProcess类:看红色框就好。

给子进程运行的函数传参

import time
from multiprocessing import Process

def task1(a,b):
    time.sleep(3)
    print(a,b)

def task2(a,b):
    time.sleep(3)
    print(a,b)

if __name__ == '__main__':

    p1 = Process(target=task1, args=('cloud','alice'))  # 位置参数
    p2 = Process(target=task2, kwargs={'a':'cloud','b':'alice'})  # 关键字参数
    p1.start()
    p2.start()

创建进程的方式(二):重写Process类的run方法

import time
from multiprocessing import Process

class MyProcess(Process):
    def run(self):
        print('run is running')
        time.sleep(3)
        print('run is over')

if __name__ == '__main__':
    obj = MyProcess()  # 产生进程对象
    obj.start()  # 此时就会开一个子进程 子进程会去运行类中的run方法
    print('主')

为什么可以这么做呢?我们知道Process继承BaseProcess类。于是我们去看BaseProcess类的run方法:

可以得知这个run方法就是留给我们重写的。

还是如何传参

需要重写BaseProcess类的双下init:

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, name, age):  # 重写__init__方法
        super().__init__()  # 这里init不传参是因为 父类init里面的形参全是默认参数
        self.name = name  # 这里的self是进程对象
        self.age = age  # 给新产生的进程对象新增两个对象独有的属性name、age
                        # 备忘:__new__产生新对象

    def run(self):
        print('run is running', self.name, self.age)  # 使用对象中的属性
        time.sleep(3)
        print('run is over', self.name, self.age)

if __name__ == '__main__':
    obj = MyProcess('cloud', 18)
    obj.start()
    print('主')

join方法

join方法的作用是:让主进程代码等待子进程代码结束之后,再执行。

主进程会卡在join方法处,主进程join方法下面的代码,不会执行。直到子进程运行结束。

# 1.基本使用
import time
from multiprocessing import Process

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print('主')

'''输出结果:
子进程开始执行
子进程执行结束
主
'''
使用join方法会让主程序等待子进程结束。不加join时,应该是'主'最先被输出。


# 2.非阻塞例子
from multiprocessing import Process
import time

def task(name, n):
    print('%s is running' % name)
    time.sleep(n)
    print('%s is over' % name)

if __name__ == '__main__':
    p1 = Process(target=task, args=('jason1', 1))  # 子进程p1 执行时间为1s
    p2 = Process(target=task, args=('jason2', 2))  # 子进程p2 执行时间为2s
    p3 = Process(target=task, args=('jason3', 3))  # 子进程p3 执行时间为3s
    start_time = time.time()
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    print(time.time() - start_time)  # 请问运行时间是?

'''
jason1 is running
jason2 is running
jason3 is running
jason1 is over
jason2 is over
jason3 is over
3.2216908931732178
'''


# 3.阻塞例子
from multiprocessing import Process
import time

def task(name, n):
    print('%s is running' % name)
    time.sleep(n)
    print('%s is over' % name)

if __name__ == '__main__':
    p1 = Process(target=task, args=('jason1', 1))  # 子进程p1 执行时间为1s
    p2 = Process(target=task, args=('jason2', 2))  # 子进程p2 执行时间为2s
    p3 = Process(target=task, args=('jason3', 3))  # 子进程p3 执行时间为3s
    start_time = time.time()
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print(time.time() - start_time)  # 请问运行时间是?

'''
jason1 is running
jason1 is over
jason2 is running
jason2 is over
jason3 is running
jason3 is over
6.449279308319092
'''


# 4.谁的速度快?
import time
from multiprocessing import Process

def task(name):
    start_time = time.time()
    print(f'{name}子进程开始执行')
    time.sleep(3)
    print(f'{name}子进程执行结束')
    print(f'{name}进程耗时:{time.time()-start_time}')

if __name__ == '__main__':
    p1 = Process(target=task,args = ('p1',))
    p2 = Process(target=task,args = ('p2',))

    p1.start()
    p2.start()
    p1.join()
    print('主')  # 这段print代码有时可以跑的比第二个子进程还快!


# 5.无法阻挡子进程
import time
from multiprocessing import Process

def task(name):
    start_time = time.time()
    print(f'{name}子进程开始执行')
    time.sleep(3)
    print(f'{name}子进程执行结束')
    print(f'{name}进程耗时:{time.time()-start_time}')

def task2(name):
    start_time = time.time()
    print(f'{name}子进程开始执行')
    time.sleep(1)
    print(f'{name}子进程执行结束')
    print(f'{name}进程耗时:{time.time()-start_time}')

if __name__ == '__main__':
    p1 = Process(target=task,args = ('p1',))
    p2 = Process(target=task2,args = ('p2',))

    p1.start()
    p2.start()
    p1.join()  # join只能卡住主进程的代码运行 不能阻挡子进程
    print('主')

'''
p1子进程开始执行
p2子进程开始执行
p2子进程执行结束
p2进程耗时:1.0048680305480957
p1子进程执行结束
p1进程耗时:3.0130884647369385
主
'''

进程间的数据隔离

同一台计算机上的多个进程数据是严格意义上的物理隔离(默认情况下)

from multiprocessing import Process
import time

money = 1000

def task():
    global money
    money = 666
    print('子进程的task函数查看money', money)

if __name__ == '__main__':
    p1 = Process(target=task)
    p1.start()  # 创建子进程
    time.sleep(3)  # 主进程代码等待3秒
    print(money)  # 主进程代码打印money

IPC机制(进程间通信) 消息队列

IPC的概念:实现进程间通信

消息队列:消息队列可以理解成一个公共存数据的地方 所有进程都可以存 也可以取。

from multiprocessing import Queue
# 1.产生消息队列q
q = Queue(3)  # 括号内可以指定存储数据的个数

不给Queue传值的情况下,会自动使用最大值:(SEM_VALUE_MAX)

此时队列可以容纳值的数量为:

get() put()

使用get从队列中获取值 使用put往队列添加值 符合先进先出

from multiprocessing import Queue

q = Queue(3)
q.put(111)
q.put(222)
q.put(333)
print(q.get()) # 111
print(q.get()) # 222
print(q.get()) # 333

当队列已满时,你使用put添加值

或者队列已空,你使用get获取值

都会导致当前进程阻塞,等待有别的进程往队列里添加/获取值

from multiprocessing import Queue

q = Queue(3)
q.put(111)
q.put(222)
q.put(333)
q.put(444)
print('阻塞')  # 无法输出这行代码

full() empty()

full 用于判断队列是否为满。

但有些需要注意:

1.判断的是当前进程的队列

2.判断的是执行full()这行代码这个瞬时时间下,队列的状态是否为满

empty 与 full 相反 判断的是队列是否为空

from multiprocessing import Queue

q = Queue(3)

q.put(111)
print(q.full())  # 判断队列是否已满  False
q.put(222)
q.put(333)
print(q.full())  # 判断队列是否已满  True
print(q.get())
print(q.get())
print(q.empty())  # 判断队列是否为空  False
print(q.get())
print(q.empty())  # 判断队列是否为空  True

注意:

full() empty() 在多进程中都不能使用!!!

可能当前进程你执行完q.empty 之后 马上又另外的进程塞一个数据进来 q.empty只能判断当前进程一个瞬时时间管道是否空。

get_nowait()

这个方法如他的名字,如果获取不到队列的值,就马上抛出异常。

from multiprocessing import Queue

q = Queue(3)
q.put(111)
print(q.get())
print(q.get_nowait())  # queue.Empty

消息队列实现子进程消息传递

from multiprocessing import Process, Queue

def product(q):
    q.put('子进程p添加的数据')

def consumer(q):
    print('子进程获取队列中的数据', q.get())

if __name__ == '__main__':
    q = Queue()
    # 主进程往队列中添加数据
    # q.put('我是主进程添加的数据')
    p1 = Process(target=consumer, args=(q,))
    p2 = Process(target=product, args=(q,))
    p1.start()
    p2.start()
    print('主')
'''
consumer进程在消息队列没有数据的时候 这个进程会等待product进程往Queue放东西
'''

消费者模型

"""回想爬虫"""
生产者
    负责产生数据的'人'
消费者
    负责处理数据的'人'

该模型除了有生产者和消费者之外还必须有消息队列
(只要是能够提供数据保存服务和提取服务的理论上都可以)

生产者消费者模型可以实现程序结耦合。两个程序(生产者、消费者)基于消息队列都可以独立运行。
用包子铺举例:
老板提前做好包子放在蒸笼(消息队列)。
没有客人的时候可以一直做包子。客人也不需要等待,随时可以来拿包子。(程序结耦合)

进程对象多种方法

# 1.current_process查看
from multiprocessing import Process, current_process

def task():
    print('子进程')
    print(current_process())  # 查看进程
    print(current_process().pid)  # 查看进程号

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主进程:')
    print(current_process())
    print(current_process().pid)

'''
主进程:
<_MainProcess(MainProcess, started)>
504
子进程
<Process(Process-1, started)>
23260
'''


# 2.os模块
import os

print(os.getpid())  # 获取当前进程的进程号  # 17672
print(os.getppid())  # 获取当前进程的父进程的进程号  # 28600  # 我用的是pycharm应该获取的是pycharm的进程号

去cmd里输入tasklist查看,果然如此:

1.终止进程
    p1.terminate()
    ps:计算机操作系统都有对应的命令可以直接杀死进程
    windows:taskkill /F /PID 进程号
2.判断进程是否存活
    p1.is_alive()
3.start()
4.join()


# 例子
import time
from multiprocessing import Process

def task():
    print('子进程开始执行')
    time.sleep(3)
    print('子进程执行结束')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.terminate()
    print(p.is_alive())  # True  # 不是已经终止进程了吗?为什么还是True

# 1.前脚刚开 后脚就关了 这时候进程都起不来 如果加中间sleep可能就子进程运行完了
# 2.执行terminate相当于让操作系统关掉刚刚创建的子进程,而这是需要时间的。可能执行is_alive的速度比操作系统关进程的速度快,所以结果是True

守护进程

守护进程会随着守护的进程结束而立刻结束
使用场景:
一键关闭所有子进程

eg: 吴勇是张红的守护进程 一旦张红嗝屁了 吴勇立刻嗝屁

from multiprocessing import Process
import time

def task(name):
    print('德邦总管:%s' % name)
    time.sleep(3)
    print('德邦总管:%s' % name)

if __name__ == '__main__':
    p1 = Process(target=task, args=('大张红',))
    p1.daemon = True
    p1.start()
    time.sleep(1)
    print('恕瑞玛皇帝:小吴勇嗝屁了')

僵尸进程

僵尸进程
    进程执行完毕后并不会立刻销毁所有的数据 会有一些信息短暂保留下来
     比如进程号、进程执行时间、进程消耗功率等给父进程查看
     ps:所有的进程都会变成僵尸进程

孤儿进程

孤儿进程
    子进程正常运行 父进程意外死亡 操作系统针对孤儿进程会派遣福利院管理

多进程数据错乱问题

火车票抢票时经常出现这种问题:

本来有3张票 点进去之后一张票都没有

是因为:你看到的这三张票基于 进入app的时间

你需要刷新 才能获取当前时间的信息

模拟抢票软件
from multiprocessing import Process
import time
import json
import random

# 查票
def search(name):
    with open(r'data.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    print('%s在查票 当前余票为:%s' % (name, data.get('ticket_num')))

# 买票
def buy(name):
    # 再次确认票
    with open(r'data.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    # 模拟网络延迟
    time.sleep(random.randint(1, 3))
    # 判断是否有票 有就买
    if data.get('ticket_num') > 0:
        data['ticket_num'] -= 1
        with open(r'data.json', 'w', encoding='utf8') as f:
            json.dump(data, f)
        print('%s买票成功' % name)
    else:
        print('%s很倒霉 没有抢到票' % name)

def run(name):
    search(name)
    buy(name)

if __name__ == '__main__':
    tick_dict = {'ticket_num': 1}  # 创建车票字典 里面就只有一张票
    with open('data.json', 'w', encoding='utf8') as f:
        json.dump(tick_dict, f)

    for i in range(10):
        p = Process(target=run, args=('用户%s'%i, ))
        p.start()

"""
多进程操作数据很可能会造成数据错乱>>>:互斥锁
    互斥锁
        将并发变成串行 牺牲了效率但是保障了数据的安全
        也就是让进程排队
"""

输出结果:

练习

1.将TCP服务端使用多进程实现并发效果
    聊天全部采用自动发送 不要用input手动输
2.整理今日内容及博客
3.查询IT行业可能出现的锁名称及概念
4.整理理论内容 尝试编写cs架构的软件 实现数据的上传与下载


'''将TCP服务端使用多进程实现并发效果
    聊天全部采用自动发送 不要用input手动输'''
import socket
from multiprocessing import Process

def welcome(sock, addr):
    data = sock.recv(1024)
    print(f'from:{addr} msg:{data}')
    sock.send(b'welcome')

server = socket.socket()
server.bind(('192.168.1.81', 8081))
server.listen(5)
while True:
    sock, addr = server.accept()
    print(addr)
    # p = Process(target=welcome, args=(sock, addr))  # 在这里加这行代码会报错!
    # OSError: [WinError 10048] 通常每个套接字地址(协议/网络地址/端口)只允许使用一次。
    # p.start()  # 为什么在这里起一个进程会报错呢? 因为windows是导模块 这里起一个进程 会把server.bind重新运行一遍 我的天!


'''将TCP服务端使用多进程实现并发效果
    聊天全部采用自动发送 不要用input手动输'''

'''服务端'''
import socket
from multiprocessing import Process

def welcome(sock, addr):
    while True:  # 4.重复接受客户端信息并回复
        try:  # 5.window下客户端断开会抛出异常 捕获此异常
            data = sock.recv(1024)
            print(f'from:{addr} msg:{data.decode("utf8")}')
            sock.send(b'welcome')
        except Exception:
            break

if __name__ == '__main__':  # 1.很重要 防止使用process创建进程时 将主进程代码重复运行 导致报错
    server = socket.socket()
    server.bind(('192.168.1.81', 8081))
    server.listen(5) # 2.半连接池 定义最多等待处理5个请求
    while True:
        sock, addr = server.accept()
        p = Process(target=welcome, args=(sock, addr))  # 3.每次接受请求,就开一个进程去处理 主进程回到accept等待
        p.start()


'''客户端'''
import socket

cilent = socket.socket()
cilent.connect(('192.168.1.81', 8081))
while True:
    user_input = input('please input >>>')
    if user_input == 'q':
        break
    cilent.send(user_input.encode('utf8'))
    print(cilent.recv(1024))


tasklist # 任务管理器
netstat -aon|findstr 8080  # 查找端口号