异步编程


进程间的通信

— 进程间通信(IPC, inter-Process Communication) 是指在不同进程间传播或交换信息
— IPC的方式通常有管道(包括无名管道和命名管道),消息队列, 信号量, 共享存储,socket,streams等

本地进程间的通信-本地程序之间交互 如QQ截图到微信
远程进程的通信-如上网, web服务

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动, 进程是系统进行资源分配和调度的一个独立单位。每个进程都有自己的独立内存空间, 不同进程通过进程间通信来通信。

进程间通信的几种方式

管道, 通常指无名管道, 是unix系统ipc最古老的形式

1 提示半双工的(即数据只能在一个方向上流动), 具有固定的读端和写端
2 它只能用于具有亲缘关系的进程间通信(也是父子进程或者兄弟进程直接)
3 它可以看成是一中特殊的文件对于它的读写也可以使用普通的read write等函数 但是它不是普通的文件,并不属于其他任何文件系统, 并且只存在于内存中
通过系统函数int pip(int_pipedes[2])进行创建

FIFO, 命名管道,一种特殊的文件类型

1 FIFO可以在无关的进程之间交换数据, 与无名管道不同。
2 FIFO有路径名与之相关联,它以一种特殊设备文件形式存在于文件系统中。命名管道,FIFO,是一种特殊的文件类型
命名管道有mkfifo函数创建,打开用open
fifo与pipe 之间唯一的区别在他们创建与打开的方式不同,这些工作完成之后,他们具有相同的语义

消息队列

是消息的连接表,存放在内核中,一个消息队列由一个标识符(即队列ID)来标识
1 消息队列是面向记录的,其中的消息具有特定的格式以及特定的优先级
2 消息队列独立与发送与接收进程。 进程终止时, 消息队列及其内容并不会被删除。
消息队列可以实现消息的随机查询, 消息不一定要以先进先出的次序读取, 也可以按消息的类型读取。

信号量

信号量(semaphore)与已经介绍过的IPC结构不同, 他是一个计数器。信号量用于实现进程间的互斥与同步,而不是用于存储进程间的通信数据。

1 信号量用于进程间同步,若要在进程间传递数据需要结合共享内存
2 信号量基于操作系统的PV操作, 程序对信号量的操作都是原子操作。
3 每次对信号量的PV操作不仅限于对信号量值加1或减1, 而且可以加减任意正整数
4 支持信号量组

共享存储

共享内存(shared Memory)指两个或多个进程共享一个给定的存储区。
1 共享内存是最快的一种IPC, 因为进程是直接对内存进行存取
2 因为多个进程可以同时操作,所以需要进行同步
3 信号量+共享内存通常结合在一起使用,信号量用来同步对共享内存的访问

*PV操作 这是狄克斯特拉用荷兰文定义的, 因为在荷兰文中,通过叫passeren,释放叫VRIJGEVEN,pv操作因此得名。

*原子操作 如果这个操作所在的层(layer)的更高层不能发现其内部实现与结构,那么这个操作是一个原子(atomic)操作。
原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中一部分。

进程的基本特性:
动态性 并发性 独立性 异步性

进程控制块

进程控制块(Processing Control Block) 也成进程描述块(Process Descriptor)是操作系统核心
中一种数据结构,主要表示进程状态,在创建进程时,首先建立PCB,它伴随进程运行的全过程,直到进程撤销而报销,系统可以利用PCB来控制和管理进程所以说PCB(进程控制块)是系统感知进程存在的唯一标志。

UNIX系统中几个进程控制操作

fork():通过复制调用进程来建立新的进程,是最基本的进程建立过程,
exec(): 包括一系列的系统调用,它们是通过用一段新的程序代码覆盖原来的地址空间,实现进程执行代码的转换。
wait(): 提供初级的进程同步操作,能使一个进程等待另外一个进程的结束
exit(): 用来终止一个进程的执行

fork *创建进程昂贵,消耗资源
IPC *父进程向子进程传递数据容易, 子到父难

线程:lightweight process

线程

进程的一个可调度的实体
CPU调度的基本单位
不拥有系统资源
消耗资源少
多线程可以相互交互

线程之间共享

进程指令
大多数数据
打开的文件(描述符)
信号处理函数和信号处理
用户ID和组ID

线程的独立信息

线程ID
寄存器集合(程序计数器和栈指针)
栈(用于存放局部变量和返回地址)
erron
信号掩码
优先级

线程的创建

系统底层的pthread_create创建线程
在POXIS规范的系统中, 称为pthread

线程 VS 进程

地址空间 线程是进程的执行单位 进程至少有一个线程 线程共享进程地址空间, 进程有独立的空间

资源使用 进程是资源分配的单位

CPU调度 线程是调度的单位

并发 都可并发

是否独立 线程依存进程

IO 模型

阻塞式 非阻塞式 IO复用 信号驱动式 异步IO

I/O 相关的操作分两个阶段
1 从内核等待数据
2 从内核向进程复制数据

阻塞式I/O

非阻塞式I/O

 

I/O 复用

适用场景

当客户处理多个描述符(一般是交互式输入或网络套接字),必须适用io复用
当一个客户处理多个套接字时,这种情况很少见,但也可能出现
当一个TCP服务器既要处理监听套接字,又要处理已连接的套接字,一般用IO复用
如果一个服务器既要适用TCP,又要适用UDP 一般要用IO复用
如果一个服务器要处理多个服务或者多个协议,一般就要用IO复用

信号驱动式,时间驱动

 

异步IO

 

区别

并发和并行

并发(concurrency)

一段时间,一次处理多件事(单个CPU同一段时间处理多个任务)

并行(paralle)

同一时刻,一次做多件事(多核CPU同时执行)

目的实现高并发!

同步和异步

异步
不阻塞,有请求,先响应,然后有了结果再返回

同步
阻塞, 一直等着,直到得到结果

异步的实现

回调函数,通过事件循环,依靠后台函数轮询得到结果
协程 轻量级线程python go 语言层面已经支持

处理多任务操作

多线程、多进程
IO模型
协程

计算密集型, IO密集型

计算密集型:程序大多数时间花在了运算上 很少产生IO操作

IO密集型:程序大多数时间都花在了IO操作而不是运算上

实现模式:

线程一个挂了都挂了 进程不会

多进程的缺点
创建进程的代价很大
linux:通过fork函数创建 windows 通过CreatProcess创建

GIL global interpreter lock全局解析器锁


多线程:IO密集型:文件读写 网络传输

多进程:计算密集型, CPU密集型的计算问题

多线程的注意问题 锁

Linux 底层
互斥锁 mutex:mutual exclusion
保护共享变量:解决多个线程更改一个共享变量的问题,提供互斥机制
互斥锁是类型为pthread_mutex_t的变量

多线程示例
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "董礼"
# File: t1.py
# Date: 2018/7/13

from threading import Thread
import threading
import time
import logging
import requests
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, as_completed

"""线程加日志"""
logging.basicConfig(
    level=logging.DEBUG,
    format='%(threadName)-10s:%(message)s'
)

"""线程名字"""


def count_num(n):
    while n > 0:
        print(threading.current_thread().name, n)  # 线程名字
        n -= 1
        time.sleep(1)


"""线程锁"""
# lock = threading.Lock()  # 加锁
lock = threading.RLock()  # 加多重锁


def count_num_reduce(n):
    with lock:
        n -= 1
        return n


def count_num(n):
    with lock:
        while n > 0:
            logging.debug(f'{n}')  # 日志打印 自带线程名
            n = count_num_reduce(n)
            time.sleep(1)


"""新建及结束多线程"""
thread_list = []
for i in range(3):
    t = Thread(target=count_num, args=(3,), name=f'dongli-{i+1}')  # 新建线程
    t.start()
    thread_list.append(t)

for i in thread_list:  # 结束线程
    i.join()

"""多线程类"""


class Mythread(Thread):
    def __init__(self, count):
        Thread.__init__(self)
        self.count = count

    def run(self):
        count_num(self.count)


for i in range(3):
    t = Mythread(3)
    t.start()

"""多线程取结果"""

ls = [1, 2, 3, 4, 5]
q = Queue()


def ls_num():
    for i in ls:
        q.put(str(i))


class Mythread(Thread):
    def __init__(self):
        Thread.__init__(self)

    def run(self):
        ls_num()


t = Mythread()
t.start()

q = [q.get() for _ in range(len(ls))]
print(q)

"""多线程分析"""


def num():  # 不适合多线程
    for i in range(1000):
        for j in range(0, i):
            i * j


print('no thread')
start = time.time()
num()
print((time.time() - start) * 1000)

print('with thread')
start_t = time.time()
for i in range(5):
    t = Thread(target=num)
    t.start()
print((time.time() - start_t) * 1000)

urls = ['http://baidu.com', 'http://jd.com', 'http://taobao.com']


def down(url):  # 适合多线程
    response = requests.get(url).text
    res = response[:100]
    with open('x.txt', 'a') as f:
        f.write(res)


print('no thread')
start = time.time()
for i in urls:
    down(i)
print((time.time() - start) * 1000)

print('with thread')
start_t = time.time()
for i in urls:
    t = Thread(target=down, args=(i,))
    t.start()
print((time.time() - start_t) * 1000)

"""线程池"""

urls = [
    'http://jd.com',
    'http://taobao.com',
    'http://qq.com'
]


def down(url):
    r = requests.get(url)
    return r.text[:10]


"""用法一"""
with ThreadPoolExecutor(5, thread_name_prefix='dongli') as th:
    futures = [th.submit(down, i) for i in urls]
    for i in as_completed(futures):
        print(i.result())

"""用法二"""
with ThreadPoolExecutor(5, 'donlgi') as th:
    futures = th.map(down, urls)

for i in futures:
    print(i)

多进程示例
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "董礼"
# File: process.py
# Date: 2018/7/17
import math
import logging
import time
from multiprocessing import Process, Pool, get_logger, log_to_stderr, Lock
from concurrent.futures import ProcessPoolExecutor

logging.basicConfig(
    level=logging.DEBUG,
    format='%(process)--10d %(message)s'
)

"""新建多进程"""

r_ls = [1, 2, 3, 4, 5]


def circle_area(r):
    print(math.pi * r * r)
    return math.pi * r * r


if __name__ == '__main__':
    p_ls = []
    p = Process(target=circle_area, args=(5,))
    p.start()
    p_ls.append(p)

    for i in p_ls:
        i.join()

"""默认进程池"""
if __name__ == '__main__':

    p = Pool(2)
    res_ls = []
    for i in r_ls:
        result = p.apply_async(circle_area, args=(i,))
        res_ls.append(result)

    for i in res_ls:
        print(i.get())

"""加日志"""
if __name__ == '__main__':

    log_to_stderr()
    get_logger()

    p = Pool(2)
    res_ls = []
    lock = Lock()
    for i in r_ls:
        with lock:
            result = p.apply_async(circle_area, args=(i,))
            res_ls.append(result)

    for i in res_ls:
        logging.debug(i.get())

"""耗时对比"""


def warp(fun):
    def warpper():
        start = time.time()
        fun()
        print('mulit', (time.time() - start) * 1000)

    return warpper


ls = [i for i in range(10, 1, -1)]


def circle_area(r):
    return math.pi * r * r


if __name__ == '__main__':

    @warp
    def mulit_process():
        res_ls = []
        p = Pool(4)
        for i in ls:
            res = p.apply_async(circle_area, args=(i,))
            res_ls.append(res)
        for i in res_ls:
            print(i.get())


    def no_process():
        start = time.time()
        r = map(circle_area, ls)
        print(list(r))
        print('no', (time.time() - start) * 1000)


    def pool_pro():
        start = time.time()
        with ProcessPoolExecutor() as p:
            r = p.map(circle_area, ls)
            for i in r:
                print(i)
        print('pool', (time.time() - start) * 1000)


    mulit_process()
    # no_process()
    # pool_pro()

协程

多进程,多线程

系统调度
- 上下文切换
- 占用内存
- 耗费CPU时间

 

协程:coroutine/coro

- 轻量级线程(一个线程)
- 调度由用户控制
- 有独立的寄存器上下文和栈
- 切换时保存状态,回来时恢复

Python中协程的发展

•Python 2.5 coroutine-like functionality, (PEP 342) -> yield
•Python 3.3 improves this ability, by supporting delegating to a
subgenerator (PEP 380) -> yield from
•Python 3.4 asynchronous I/O framework as standardized in PEP
3156, -> asyncio
•Python 3.5 async/await syntax (PEP 0492).
•Eventlet
•Greenlet
•gevent

➢yield,send
➢ yield from
➢ asyncio
➢ async/await

 

协程1期:模拟阶段

- 嵌套yield的函数交互运行
- next/send(None)激活yield
- send(data)向yield发送数据

协程2期:过渡阶段

- 通过yield from调用另个生成器
- 通过asyncio的装饰器添加协程功能

asyncio:一个神奇的异步io库
配合yield from实现了真的协程

The word "coroutine", like the word "generator", is
used for two different (though related) concepts:
•The function that defines a coroutine (a function
definition decorated with asyncio.coroutine).
If disambiguation is needed we will call this
a coroutine function.
•The object obtained by calling a coroutine function.
This object represents a computation or an I/O
operation (usually a combination) that will complete
eventually. If disambiguation is needed we will call it
a coroutine object

Things a coroutine can do:
• result = yield from future -- suspends the
coroutine until the future is done, then returns the
future's result, or raises an exception, which will be
propagated.
• result = yield from coroutine -- wait for another
coroutine to produce a result (or raise an exception,
which will be propagated). The coroutine expression
must be a call to another coroutine

代码演示
import asyncio

"""yield 模拟协程一"""


def a():
    while True:
        s = yield
        print(s)


def b():
    x = a()
    next(x)
    while True:
        msg = input('>>')
        x.send(msg)


b()

"""yield 模拟协程二"""

def a():
    r = ''
    while True:
        n = yield r
        print(n)
        r = 'ok'


def b():
    x = a()
    x.send(None)
    for i in range(10):
        t = x.send(i)
        print(t)


b()


"""yield from"""

def a(x):
    res = yield
    print('start', x)
    return f'end {res}'


def b(y):
    x = yield from a(y)
    print(x)

try:
    x = b('python')
    x.send(None)
    x.send('php')
except StopIteration:
    pass

"""from asyncaic"""


def a(x):
    res = yield
    print('start', x)
    return f'end {res}'


@asyncio.coroutine
def b(y):
    x = yield from a(y)
    print(x)


try:
    x = b('python')
    x.send(None)
    x.send('php')
except StopIteration:
    pass


最后更新于:2018-07-18 13:41:52