当前位置:   article > 正文

Python多线程和多进程_python process active_children

python process active_children

1.基本概念

定义含义
并发concurrency,同一时刻只能有一条指令执行,但是多个线程的对应的指令被快速轮换地执行
并行parallel,同一时刻,有多条指令在多个处理器上同时执行,并行必须要依赖于多个处理器
阻塞程序未得到所需计算资源时被挂起的状态
非阻塞程序在等待某操作过程中,自身不被阻塞,可以继续处理其他的事情
同步不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,我们称这些程序单元是同步执行的
异步为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的

2.多线程

0.常用的线程方法

# 如上所述,创建一个线程
t=Thread(target=func)

# 启动子线程
t.start()

# 阻塞子线程,主线程等待子线程运行完毕之后才退出
t.join()

# 判断线程是否在执行状态,在执行返回True,否则返回False
t.is_alive()
t.isAlive()

# 设置守护线程,随主线程退出而退出,默认为False
thread.setDaemon(True)

# 设置线程名
t.name = "My-Thread"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1.用函数创建多线程

import threading
import time


def target(second):
    print(f'{threading.current_thread().name} is running')
    print(f'{threading.current_thread().name} sleep {second}s')
    time.sleep(second)
    print(f'{threading.current_thread().name} is ended')


print(f'{threading.current_thread().name} is running')
for i in [1, 5]:
    thread = threading.Thread(target=target, args=[i])
    thread.start()
print(f'{threading.current_thread().name} is ended')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
MainThread is running
Thread-1 is running
Thread-1 sleep 1s
Thread-2 is running
Thread-2 sleep 5s
MainThread is ended
Thread-1 is ended
Thread-2 is ended
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.用类创建多线程

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, second):
        threading.Thread.__init__(self)
        self.second = second

    def run(self):
        print(f'{threading.current_thread().name} is running')
        print(f'{threading.current_thread().name} sleep {self.second}s')
        time.sleep(self.second)
        print(f'{threading.current_thread().name} is ended')


print(f'{threading.current_thread().name} is running')
threads = []
for i in [1, 5]:
    thread = MyThread(i)
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()
print(f'{threading.current_thread().name} is ended')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
MainThread is running
Thread-1 is running
Thread-1 sleep 1s
Thread-2 is running
Thread-2 sleep 5s
Thread-1 is ended
Thread-2 is ended
MainThread is ended
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.锁

锁的分类:互斥锁,可重入锁

锁的应用场景:确保同一时间只有一个线程操作数据

1.互斥锁
import threading
import time

count = 0


class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global count

        lock.acquire()
        temp = count + 1
        time.sleep(0.001)
        count = temp
        lock.release()


lock = threading.Lock()
threads = []
for _ in range(100):
    thread = MyThread()
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
print(f'Final count: {count}')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
2.可重入锁
import threading
import time

count = 0


class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        n = 0
        with lock:  
            for i in range(10):
                n += 1
                with lock:
                    print(n)


lock = threading.RLock()
thread1 = MyThread()
thread1.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
3.防止死锁的加锁机制
import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()


@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))

    # Make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local, 'acquired', [])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


x_lock = threading.Lock()
y_lock = threading.Lock()


def thread_1():
    while True:
        with acquire(x_lock):
            with acquire(y_lock):
                print('Thread-1')


def thread_2():
    while True:
        with acquire(y_lock):
            with acquire(x_lock):
                print('Thread-2')


if __name__ == '__main__':
    t1 = threading.Thread(target=thread_1)
    t1.daemon = True
    t1.start()

    t2 = threading.Thread(target=thread_2)
    t2.daemon = True
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

3.多进程

0.常用的进程方法

# 启动子进程
t.start()

# 进程等待,主线程等待子线程运行完毕之后才退出
t.join()

# 判断进程是否在执行状态,在执行返回True,否则返回False
t.is_alive()

# 守护进程,随进程退出而退出,默认为False
t.daemon = True

# 设置主进程名
t.name = "My_Process"

#终止子进程
p.terminate()
p.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1.用函数创建多进程

import time
import multiprocessing


def process(index):
    time.sleep(index)

    print(f'Process: {index}')


if __name__ == '__main__':
    print(f'CPU number: {multiprocessing.cpu_count()}')
    for i in range(5):
        p = multiprocessing.Process(target=process, args=[i])
        p.start()

    for p in multiprocessing.active_children():
        print(f'Child process name: {p.name} id: {p.pid}')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2.用类创建多进程

import time
from multiprocessing import Process


class MyProcess(Process):

    def __init__(self, loop):
        super().__init__()
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f'Pid: {self.pid} LoopCount: {count}')


if __name__ == '__main__':

    for i in range(2, 5):
        p = MyProcess(i)
        p.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

3.锁

锁的分类:互斥锁,可重入锁

锁的应用场景:确保同一时间只有一个线程操作数据

1.互斥锁
from multiprocessing import Process, Lock
import time


class MyProcess(Process):

    def __init__(self, loop, lock):
        super().__init__()
        self.loop = loop
        self.lock = lock

    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            self.lock.acquire()
            print(f'Pid: {self.pid} LoopCount: {count}')
            self.lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(1, 5):
        p = MyProcess(i, lock)
        p.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
2.可重入锁
from multiprocessing import Process, RLock


class MyProcess(Process):

    def __init__(self, lock):
        super().__init__()
        self.lock = lock

    def run(self):
        n = 0
        with self.lock:
            for i in range(10):
                n += 1
                with self.lock:
                    print(n)


if __name__ == '__main__':
    lock = RLock()
    thread1 = MyProcess(lock)
    thread1.start()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
3.防止死锁的加锁机制

4.信号量

背景:允许多个进程来访问共享资源,同时还需要限制能访,问共享资源的进程的数量

from multiprocessing import Process, Semaphore, Lock, Queue
import time


class Base(Process):
    def __init__(self, buffer1, empty1, full1, lock1):
        super(Base, self).__init__()
        self.buffer = buffer1
        self.empty = empty1
        self.full = full1
        self.lock = lock1


class Consumer(Base):
    def run(self):
        while True:
            self.full.acquire()
            self.lock.acquire()
            self.buffer.get()
            print('Consumer pop an element')
            time.sleep(1)
            self.lock.release()
            self.empty.release()


class Producer(Base):
    def run(self):
        while True:
            self.empty.acquire()
            self.lock.acquire()
            self.buffer.put(1)
            print('Producer append an element')
            time.sleep(1)
            self.lock.release()
            self.full.release()


if __name__ == '__main__':
    buffer = Queue(10)
    empty = Semaphore(2)
    full = Semaphore(0)
    lock = Lock()

    p = Producer(buffer, empty, full, lock)
    c = Consumer(buffer, empty, full, lock)
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print('Main Process Ended')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

3.多线程和多进程之间的比较

名称含义应用场景
多线程操作系统进行运算调度的最小单位IO密集型(磁盘IO网络IO数据库IO等,譬如爬虫,网站开发等)
多进程系统进行资源分配和调度的一个独立单位CPU密集型(大数据分析,机器学习等)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/151635
推荐阅读
相关标签
  

闽ICP备14008679号