多进程TCP服务端并发- 进程join方法 - IPC机制
阅读原文时间:2023年07月08日阅读:3

目录

综合使用

同步阻塞
同步非阻塞
异步阻塞
异步非阻塞
# 这是CPU利用率最高,效率最高的运行方式


关键词 multiprocessing 模块 Process

from multiprocessing import Process
import time

def task(name):
    print('来自task', name)
    time.sleep(3)
    print('over', name)

if __name__ == '__main__':
    p1 = Process(target=task,args=('name',))
    # 创建异步子进程对象 执行程序为task,传入位置参数args=('name',)必须已远足方式传参
    p2 = Process(target=task,kwargs={'name1':'moon'})
        # 传家异步子进程对象,执行程序为task,关键字传参法kwargs=,必须已字典形式传参
    p1.start()
    print('主要')   

'''
当程序运行到 子进程 进行时 会创立一个单独空间去运行子程序,不影响主程序运行 主程序会接着运行
主要
来自task moon
over moon
'''

方式二:

class MyProcess(Process):
  # 创建一个类,继承Process的所有内容
    def __init__(self,name,age):
        super().__init__()
        # 调用父类的信息 新增内容
        self.name = name
        self.age = age

    def run(self):
        print('来自子程序',self.name,self.age)

if __name__ == '__main__':
    obj = MyProcess('moon',18)
    obj.start()
    print('主')

 '''
 面向对象思想。创建类 生产对象调用子进程
 '''



同一台计算机上的多个进程数据是严格意义上的物理隔离的,默认情况下

from multiprocessing import Process
import time

m1 = 1000

def run():
    global m1
    m1 = 666
    print(m1)

if __name__ == '__main__':
    p1 = Process(target=run,)
    p1.start()
    time.sleep(2)
    # 子进程已经执行结束了,正常m1应该被重复赋值了
    print(m1)
    # 1000 但是结果是没有,因为子进程数据独立,不影响主进程数据


1.什么是join方法
        可以是控制为子进程在运行完成后再进行主进程

from multiprocessing import Process
import time

def tack():
    print('我是子进程')
    time.sleep(3)
    print('等待2秒')

def tack1():
    print('我是子进程2')
    time.sleep(6)
    print('等待3秒2')

if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=tack,)
    p2 = Process(target=tack1,)
    p1.start()
    # 子进程开始进行
    p2.start()
    # 子进程开始进行
    p1.join()
    # 子进程需要3秒运行结束
    p2.join()
    # 在等待p1时 p2也在进行中
    print('主进程')
    # 因为使用了join方法,主进程需要等对应的子进程运行结束才可以执行
    print(time.time() - start_time)
    # 所以最用用时为 6秒 而不是 6+3=9 

    # p.start()  # 异步
    '''主进程代码等待子进程代码运行结束再执行'''
    # p.join()
    # print('主')


IPC:进程间通信
消息队列:所有线程程序都可以存,也都可以取 (队列:先进先出)

关键词 Queue 

from multiprocessing import Queue

Q = Queue(3)
# 类产生一个对象(消息队列),括号内是可以设定贮存数据的最大额度

Q.put(111)
Q.put(222)
# put 可以向队列中添加数据
# 如果存满了也会挺着 等待到能存进去为止

print(Q.full())
# full 可以判断队列数据是否已满 返回布尔值

print(Q.get())
# get 可以向队列中取值 队列默认先进先出原则
# 如果取不到会挺着 等待取到为止 也不报错

print(Q.get(timeout = 3))
# 等待3秒 如果还取不到就报错

print(Q.get_nowait())
# get_nowait 向队列中取值,立即就要 取不到就报错

print(Q.empty())
# empty 判断队列是否为空 返回布尔值

可以配合捕捉错异常
try:
    print(q.get(timeout=3))
    # 如果取不到值3秒后会报错
except Exception as e:
    print('队列中暂无数据')

"""
full() empty() get_nowait()在多进程中都不能使用!!!
因为在多线程情况下可能出现同时间的存取
"""


只是一个编程思维概念
举例:爬虫工程师

生产者:
   负责生产数据的人,把你需要的数据都给你 

消费者:
      负责处理数据的人,拿到提供的数据然后二次加工

该模型除了有生产者和消费着之外还必须有消息队列(只要是能提供数据保存服务和提取服务的理论上都可以)


'''
一台计算机上的每一个进程都会有自己的PID号,也就是进程号
如何查看进程号:
    windows电脑 cmd输入 tasklist 命令 即可查看所有
    mac电脑  终端输入  ps aux
'''

from multiprocessing import Process,current_process
import time
import os

def task():
    print(current_process().pid)
    # 查看当前子进程号
    print(os.getpid())
    # 查看当前子进程号
    time.sleep(5)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.terminate() # 杀死这个进程
    time.sleep(0.1)
    print(p.is_alive())  # 查看进程是否存活
    # print('主',current_process().pid)
    print('主',os.getpid())
    # 查看当前主进程号
    print('主主',os.getppid())
    # 查看当前主进程号父级号

# os.getpid()
# current_process().pid
# # 这两个方法都是查看当前所在进程下的pid号码
# os.getppid()
# # 这个是查看当前进程的父进程pid号码


僵尸进程
   进程执行完毕后并不会立刻销毁所有数据 会有一些信息短暂保留下来
   比如 进程号进程执行时间, 进程耗费功力等给父进程查看
   父进程等待子进程运行结束,回收进程号
   ps:所有的进程在关闭时都会有一段时间为僵尸进程,在僵尸进程期间
   进程号不释放

孤儿进程
   子进程正常运作,父进程意外死亡
   操作系统针对孤儿进程会派遣福利院管理


from multiprocessing import Process
import time

# 关键词 .daemon 守护进程

def task(name):
    print('活着',name)
    time.sleep(3)
    print('死去',name)

if __name__ == '__main__':
    p = Process(target=task,args=('moon',))
    # p = Process(target=task,kwargs={'name':'moon'})
    p.daemon = True
    # 将子进程 P 设置成为守护进程
    # 必须要在子进程开始前设置好
    p.start()
    print('主进程结束')


服务端:

import socket
from multiprocessing import Process

def talk(sock):
    while True:
        msg = sock.recv(1024)
        print(msg.decode('utf8'))
        sock.send(msg.upper())

if __name__ == '__main__':
    server = socket.socket()
    server.bind(('192.168.1.99', 8888))
    # 确保次代码不能循环,因为地址只能绑定一次
    server.listen(5)
    while True:
    #循环创建子进程
        print('等待连接中...')
        sock, addr = server.accept()
        print(f'用户:{addr[0]}已接入')
        p = Process(target=talk, args=(sock,))
        # 创建子进程 异步处理
        p.start()

'''
循环创建子进程进行接收 对话   每创建一个子进程后 再次进入等待接入状态
'''

客户端:

import socket

client = socket.socket()
client.connect(('192.168.1.99', 8888))

while True:
    my_msg = input('请输入您要发送的内容')
    client.send(my_msg.encode('utf8'))
    msg = client.recv(1024)
    print(msg.decode('utf8'))