Python实战之processing进程(多进程,进程池,lock锁,进程间的通讯)

Source

首先须知

什么是IO?

从硬盘读块数据,从网络读块数据属于IO操作(IO操作不占用cpu)

计算占用cpu,如:1+1

Python多线程其实就是一个线程,由于利用CUP的上下文切换看起来就像是并发..上下文切换消耗资源

Python多线程 不适合CPU密集操作型的任务,适合IO操作密集型的任务

大量运算占CPU尽量少用多线程,用单进程更快

sockeserver接收多个网络并发的就是IO操作密集型的

如果一定要使用CPU密集操作型的任务呢?Python怎么解决?

使用多进程来解决CPU密集操作型的任务。

Python的线程是调用操作系统的原生线程,进程也是调用操作系统的原生进程,原生进程是由操作系统自己维护的。Python只是调用了C代码库的一个接口启动进程的,真正的进程管理还是操作系统自己完成的

 

进程processing

1.程序是不能单独运行的,只能装载到内存,系统为他分配资源进行运行,这种运行程序称之进程

进程同一时间只能干一件事,每个进程只能控制CPU一个核,由于多核同时可以多个进程,由于计算机的速度太快,1秒干了N件事,让我们看起来就等于同时干了N件事

2.进程要操作CUP必须创建一个线程,进程本身是不可以执行的,通过线程才可以进行操作CPU

3.得出结论,进程至少要有一个线程,否则就无法执行

多线程简单版

__author__ = "Burgess Zheng"

import multiprocessing
import time
import threading

def thread_run():#测试线程用的
    print(threading.get_ident()) #threading.get_ident():显示线程号

def run(name):
    time.sleep(2)
    print("hello",name)
    t = threading.Thread(target=thread_run,)#实例化线程
    t.start()#启动线程

#循环启动多个进程并发
if __name__ == '__main__':
    for i in range(10):#循环启动10进程
        p = multiprocessing.Process(target=run,args=('bob%s'%i,))
        p.start()
#和线程格式一样的

执行结果:

 

父进程 主进程 子进程的原理以及获取进程号的试验

__author__ = "Burgess Zheng"

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
#win7: os.getppid获得父进程的PID #如果是主进程调用该函数打印结果:父进程是Pycharm(PID)
#linux: os.getpid得父进程的PID  #如果是主进程调用该函数打印结果:父进程是Linux根进程(PID1)
#得出结论:每个主进程都有一个固定的父进程:win7:pycharm(PIDx)   Linux:根进程(PID1)
#os.getppid获得父进程的PID #如果是子进程调用该函数打印结果:的父进程就是启动他的进程
#得出结论:每个字进程的父进程是启动该子进程的进程

    print('process id:', os.getpid())
#os.getpid 调用者自己的进程PID
#如果是主进程调用就是主进程的PID
#如果是子进程调用就是子进程的PID
    print("\n\n")


def f(name):
    info('\033[31;1mcalled from child process function f\033[0m')
    #子进程调用info函数 #该子进程的父进程是主进程
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')#主进程调用info函数
    p = Process(target=f, args=('bob',))#实例化一个子进程
    p.start()#启动该子进程
    # p.join()
"get_ProcessID.py" [New] 33L, 1301C written

执行结果:

进程lock

Without using the lock output from the different processes is liable to get all mixed up.

如果不使用来自不同进程的锁输出,则很容易混淆。

lock实战

[root@python Part_ten]# vim lock_process.py 
__author__ = "Burgess Zheng"

from multiprocessing import Process, Lock


def f(l, i):#第一个形参接收的是锁,第二个接收的是值
    l.acquire()
    print('hello world', i)
    l.release()
#不是说过进程不需要锁吗?现在为什么要锁?
#因为我们共享一个屏幕(win7试不出来)
#如果是linux 如果不加锁,由于共享一个屏幕:出现以下情况
# 可能看到的数据是:某个进程打印hello world 还没打印显示完该字符串,
# 另外个进程可能比较快,就造成了字符串显示混乱
#但是其实没多大影响,知识观看效果有影响而已
#Linux python2才会   Python3不会

if __name__ == '__main__':
    lock = Lock()#生产一个锁

    for num in range(10):
        Process(target=f, args=(lock, num)).start()#实例子进程,锁作为实参

执行结果:

 

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:1.apply   2.apply_async

进程池的作用:限定多少个子进程可以同时执行

 和 线程的Semaphore(信号量)允许同时最多几个线程同时执行一样

 

进程池试验

__author__ = "Burgess Zheng"

from  multiprocessing import Process, Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
    #该语句的意思是,如果你主动执行这个脚本,那么里面的代码就执行
    #如果其他的Python文件通过模块调用执行该脚本,不会执行里面的代码
    #注意:在window系统启动多线程就需要增加该语句,否则报错,Linux不用

    pool = Pool(processes=5) #允许进程池同时放入5个进程。简写:(5)
    print("主进程",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)#实例化pool子进程
            #callback=回调 .. 子进程执行完Foo函数后,进行回调执行Bar函数(但是主进程执行Bar函数)
            #callback=回调作用:假如主进程可以调取数据库,那么子进程就无需连接数据库 但也可以调取数据库
            #不然有1000个子进程,个个都需要连接数据库,效率就很低
        
        #pool.apply(func=Foo, args=(i,)) #实例化pool子进程 #apply:同步执行、串行
        #pool.apply_async(func=Foo, args=(i,))#实例化pool子进程 #apply_async:异步执行、并行
    print('end')
    pool.close() #一般正常是join在close  但是进程池有点特别需要,需要close在join,否则报错
    pool.join() #如果不加join是不会执行进程池实例化的子进程

执行结果:

进程间的通讯(QUEUES,PIPE,MANAGER)

Queues 实战

__author__ = "Burgess Zheng"

from multiprocessing import Process, Queue
import threading
import queue

'''
def f():
    q.put([42, None, 'hello'])
    #线程与线程之间可以共享数据
    #所以可以直接追加到主线程生成的线程队列容器
    
if __name__ == '__main__':
    q = queue.Queue()#生成线程队列容器
    p = threading.Thread(target=f,)#实例化线程
    p.start()#启动线程
    print(q.get()) #调取队列容器的数据 结果显示:"[42, None, 'hello']"
'''


#进程本身是不可以共享数据的,也不可以互相访问,但是可以通过进程Queue实现共享数据
def f(qq):
    print("in child:",qq.qsize())
    #主进程启动了子进程后开始执行  qq == q = Queue()
    #qq.qsize == q.qsize  该进程Queue队列容器是没有数据,所以打印结果是空
    qq.put([42, None, 'hello'])
    #q.put()追加一个列表数据进入进程Queue队列容器
    #这样的话主进程就可以取该数据了
    #我们都做到进程和进程之间是独立的不可通讯的,
    #但是可以通过进程Queue()实现进程之间的通讯
if __name__ == '__main__':
    q = Queue()#生产进程Queue队列容器:可传递给子进程达到进程之间的共享数据
    p = Process(target=f, args=(q,))#实例化子进程,并且把进程Q当做实参传递过去
    p.start()#启动子进程
    print(q.get())  # 调取进程Queue数据 结果显示:"[42, None, 'hello']"
    p.join()

     # prints "[42, None, 'hello']"
    #print(q.get())  # prints "[42, None, 'hello']"

#原理:两个进程两个进程Queue()队列容器(子进程复制了主进程的进程Queue()队列容器 )
#如果子进程p追加了数据到自身进程Queue队列容器里面
#主进程要获取获取到子进程p的进程Queue队列容器的数据
#获取过程:
#进程Queue本身就封装了序列化和反序列化的两个过程
#Queue先把子进程p的进程Queue队列容器里的数据进行pickle序列化后,
#Queue在进行pickle反序列化到主进程的进程Queue队列容器里
#实际不是共享一个Queue,是两个Queue所以才需要pickle
#实现了数据的传递,而不是修改同一份数据

执行结果:

Pipes实战

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

()函数返回一个对在管道连接的连接对象,默认情况下是双工(双向)。例如

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time

两个连接管()返回的对象代表的两端管道。每个连接对象发送()recv()方法(以及其他)。注意,管道中的数据可能会损坏如果两个进程(或线程)试图从磁盘读取或写入相同的管同时结束。当然没有腐败的风险过程使用不同的管道在同一时间

__author__ = "Burgess Zheng"

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello from child'])#发送数据给(父)管道另外一边
    print(conn.recv())#接收来自(父)管道数据
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()#生成管道,管道有两边
    p = Process(target=f, args=(child_conn,))#实例化一个子进程,把child_conn边当做实参
    p.start()#启动子进程
    print(parent_conn.recv())#接收来自(子)管道的另外一边
    #print(parent_conn.recv())  # 接收来自(子)管道的另外一边#如果该管道没有数据就一直卡住
    parent_conn.send('xiaoming')  # 发送数据给(子)管道另外一边
    p.join()

执行结果:

 

Managers实战:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.

一个manager模块对象返回值是由Manager ()控制服务器进程拥有的Python对象,并允许其他进程使用代理来操纵它们。
一个manager返回由Manager()支持的类型:如list( 列表),dict(字典),Naespace(名称空间),lock(),RLock(递归锁),信号量,BoundedSemaphore(类名),condition(条件),event(事件),barrier(障碍),queue(队列),value()array(数组)。例如,

[root@python Part_ten]# vim manager_process.py
__author__ = "Burgess Zheng"

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()#追加每个子进程的PID 到d字典
    l.append(os.getpid())#追加每个子进程的PID到l列表里面的
    print(l,d)#打印显示l列表和打印d字典


if __name__ == '__main__':
    with Manager() as manager: #Manager() = manager 这种格式也行
        d = manager.dict() #生产一个可再多个进程之间进行传递共享的字典

        l = manager.list(range(5))#生产一个可再多个进程之间进行传递共享的列表(默认该列表生产5个数字>
)
        p_list = []#子进程多且主进程需要等子进程执行完毕
                   #就添加个空列表.方便join
        for i in range(10):#循环10次等于生成10进程
            p = Process(target=f, args=(d, l))
#实例化子进程 把manager生产列表和字典作为参数传递给该子进程
            p.start()#启动子进程
            p_list.append(p)#所有的子进程追加到该空列表
        for res in p_list:#所有子进程列表导入
            res.join() #所有子进程等待执行完毕主进程才可以继续执行

        l.append("from parent")#列表追加from parent字符串
        print(d)#打印显示字典
        print(l)#打印显示列表

执行结果: