python 中间件


memcached 缓存系统

开源 协议简单
高效 C语言开发 基于libevent处理
速度快,内存存储
c/s架构, tcp连接方便
数据存满时,通过lru机制进行删除

数据存储在内存中,断电消失,不是数据持久化数据库

 

安装

yum install memcached
whereis memcached 或者which memcached 查看位置
memcached -h 查看帮助 -m 占用最大内存 默认64M -c 启动多少连接
/usr/bin/memcached -p 11211 -m 128m -vv -u root
使用 set key 0 0 回车 value

使用

pip install pymemcache
from pymemcache.client.base import Client

client = Client(('192.168.66.189', 11211))
client.set('name','dongli')
client.get('name')

tv = {'ch':['ch1','ch2'], 'mudan':['md1', 'md2']}
cli.set('menu_tv', json.dumps(tv)) 序列化

import time
import json
from pymemcache.client.base import Client


# 演示缓存流程
def get_data():
    data = {'ch': ['ch1', 'ch2'], 'md': ['mdA', 'mdB']}
    time.sleep(3)
    return data


def show_data(data):
    for k, v in data.items():
        print(k, v)


def mind_data(k, data):
    client = Client(('192.168.66.189', 11211))
    res = client.set(k, json.dumps(data))
    return res


def get_cache(k):
    try:
        client = Client(('192.168.66.189', 11211))
        data = json.loads(client.get(k))
        return data
    except Exception as e:
        print(e)
        return False


if __name__ == '__main__':
    k = 'test'
    data = get_cache(k)
    if data:
        show_data(data)
    else:
        data = get_data()
        show_data(data)
        mind_data(k, data)

memcached和redis

性能高 多线程, 单线程 协程
数据类型单一 数据类型丰富
可靠性 不支持持久化, 支持

redis的订阅和发布

发布-订阅是一种消息范式,消息的发送者不会直接发送给订阅者,二十将发布的消息分为不同的类别,无需了解哪些订阅者可能存在,同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的信息,无需了解哪些发布者的存在。

subscribe 订阅
publish 发布

import redis
c1 = redis.StrictRedis(host=, port=)
发布
c1.publist('dongli', 'hello')

订阅
s1 = c2.pubsub()
s1.subscribe('dongli')
s1.listen()
for m in s1.listen():
print(m)

 

import time
import json
import redis


演示redis缓存流程
def get_data(data_tpye):
    if data_tpye == 'tv':
        data = {'ch': ['ch1', 'ch2'], 'md': ['mdA', 'mdB']}
    else:
        data = {'lenovo': ['le1', 'le2'], 'mi': ['miA', 'miB']}
    time.sleep(3)
    return data


def show_data(data):
    for k, v in data.items():
        print(k, v)


def mind_data(k, data, data_type):
    """分库存储"""
    if data_type == 'tv':
        client = redis.StrictRedis(host='192.168.66.189', port=6666, db=0)
    else:
        client = redis.StrictRedis(host='192.168.66.189', port=6666, db=1)
    res = client.set(k, json.dumps(data))
    return res


def get_cache(k, data_type):
    try:
        if data_type == 'tv':
            client = redis.StrictRedis(host='192.168.66.189', port=6666, db = 0)
        else:
            client = redis.StrictRedis(host='192.168.66.189', port=6666, db=1)
        data = json.loads(client.get(k))
        return data
    except Exception as e:
        print(e)
        return False


if __name__ == '__main__':
    k = 'test'
    data = get_cache(k, data_type='fdasf')
    if data:
        show_data(data)
    else:
        data = get_data(data_tpye='fdasfd')
        show_data(data)
        mind_data(k, data, data_type='fdasfds')

RabbitMQ

是部署最广泛的开源消息代理

消息代理的应用

将消息路由到一个或多个目的地
将消息转换为替代表示
执行消息聚合,将消息分解为多个消息并将其发送到目的地,然后将响应重新组合成一条消息以返回给用户
与外部的存储库交互以扩充消息或存储消息
调用web服务以检索数据
回应事件或错误
使用发布订阅模式提供内容和基于主题的消息路由

消息协议

AMQP 0-9-1 高级消息队列协议 是一种消息传递协议,它使符合要求的客户端应用程序能够与符合要求的消息传递中间件代理进行通信

publisher - publish - exchange - routes-queue-consumes-consumer

amqp 是一种可编程的协议

定义了三种实体(对象)
queues, exchanges, bindings
queues 队列,存储信息,用于消费
exchanges 消息中转站,包含多种类型
bindings 消息转发规则,定义了route,规定怎么把消息发送到队列

exchanges 类型

direct exchange 默认队列名字
fanout exchange amq.fanout 多路分发
topic exchange 匹配分发
headers exchange kv分发

exchanges 的属性
name 名字
durability 持久化
auto-delete 自动删除(所有队列都解除绑定的时候)

direct exchange(默认)
queue创建时,绑定一个同名的routing key
用途,把任务分配给多个workers, 每个work做特定工作,比如写日志

fanout exchange
传递消息到每一个queue,忽略routeing key
用途,多处记录日志,统一发布通知,球赛更新比分, 分布式系统更新配置信息

topic exchange
根据规则匹配相应的queue, 实现订阅发布
* 代表一个单词 # 代表任意个单词
用途根据不同标签更新新闻,根据位置信息提供商品

Headers Exchange
根据多个属性当做消息头,忽略routing key, 需要开发者定义更多内容
用途 当direct 和routeing key 不是字符串时,可使用这个自定义属性匹配

redis 发布订阅

import redis

client = redis.StrictRedis(host='192.168.66.189', port=6666)
ch = ['one', 'two', 'three']

for i in ch:
    print(i)

while True:
    select_ch = input('选择频道:')
    msg = input('发送内容:')
    if msg == 'q':
        break
    client.publish(select_ch, msg)

import redis

client = redis.StrictRedis(host='192.168.66.189', port=6666)

ch = ['one', 'two', 'three']

# for i in ch:
#     print(i)
# ch_select = input('选择频道')
#
# sub = client.pubsub()
# sub.subscribe(ch_select)
# for messenger in sub.listen():
#     if messenger['type'] == 'message':
#         print(messenger['data'].decode())

sub = client.pubsub()
sub.subscribe(ch[:2])  # 可以直接传列表
for i in sub.listen():
    print(i)

rabbitmq 安装

安装erlang

vi /etc/yum.repos.d/rabbitmq.repo

粘贴:
# In /etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

yum clean all
yum makecache
yum remove erlang-erts
yum install erlang

rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.8/rabbitmq-server-3.7.8-1.el7.noarch.rpm

yum install rabbitmq-server-3.7.8-1.el7.noarch.rpm

systemctl start rabbitmq-server
rabbitmqctl status

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl add_user dongli dongli
rabbitmqctl set_user_tags dongli administrator
rabbitmqctl set_permissions -p / dongli ".*' ".*" ".*' 添加用户 组 授权

rabbitmq 发消息

 

import pika

# 连接RabbitMQ
cred = pika.PlainCredentials(username='dongli', password='dongli')
params = pika.ConnectionParameters(host='192.168.66.189', credentials=cred)
client = pika.BlockingConnection(params)

# 创建队列
channel = client.channel()
channel.queue_declare(queue='msg_queue')

# 发送消息
msg = 'dongli is good!'
channel.basic_publish(exchange='', routing_key='msg_queue', body=msg)
print('发送消息:', msg)

import pika

cred = pika.PlainCredentials(username='dongli', password='dongli')
params = pika.ConnectionParameters(host='192.168.66.189', credentials=cred)
client = pika.BlockingConnection(params)

channel = client.channel()
channel.queue_declare(queue='msg_queue')


def callback(ch, method, property, body):
    print('收到消息', body)


# 接收消息
channel.basic_consume(callback, queue='msg_queue')
channel.start_consuming()

rabbitmq 消息持久化,负载均衡

import pika
import sys

CONF = {
    'username': 'dongli',
    'password': 'dongli',
    'host': '192.168.66.189',
    'port': '5672'}

# 连接RabbitMQ
cred = pika.PlainCredentials(username=CONF['username'], password=CONF['password'])
params = pika.ConnectionParameters(host=CONF['host'], credentials=cred)
client = pika.BlockingConnection(params)

# 创建队列
channel = client.channel()
channel.queue_declare(queue='msg', durable=True)  # 持久化

# 发送消息
msg = ' '.join(sys.argv[1:]) or 'dongli is good!'
channel.basic_publish(exchange='', routing_key='msg', body=msg,
                      properties=pika.BasicProperties(delivery_mode=2))  # 持久化
print('发送消息:', msg)


import time
import pika

cred = pika.PlainCredentials(username='dongli', password='dongli')
params = pika.ConnectionParameters(host='192.168.66.189', credentials=cred)
client = pika.BlockingConnection(params)

channel = client.channel()
channel.queue_declare(queue='msg', durable=True)


def callback(ch, method, property, body):
    print('收到消息', body.decode())
    time.sleep(body.count(b'-'))  # 验证性能
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认


# 接收消息
channel.basic_qos(prefetch_count=1)  # 提升服务质量
channel.basic_consume(callback, queue='msg')
channel.start_consuming()

rabbitmq 发布 订阅

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "董礼"
# File: base
# Date: 2018/10/15


import pika


class AdminMQ:
    """MQ基类"""

    def __init__(self, conf):
        self.conf = conf
        self.client = self.make_connect()
        self.channel = self.client.channel()

    def make_connect(self):
        """创建连接"""
        cred = pika.PlainCredentials(username=self.conf['username'], password=self.conf['password'])
        params = pika.ConnectionParameters(host=self.conf['host'], credentials=cred)
        client = pika.BlockingConnection(params)
        return client

    def make_exchange(self, exchange):
        """创建exchange"""
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')

    def random_queue(self):
        """生成唯一的queue"""
        q = self.channel.queue_declare(exclusive=True)
        return q.method.queue

    def bind_queue(self, queue, exchange):
        """绑定exchange, queue"""
        self.channel.queue_bind(queue=queue, exchange=exchange)

    def make_queue(self, queue_name):
        """创建队列"""
        self.channel.queue_declare(queue=queue_name, durable=True)

    def publish(self, exchange, queue_name, msg):
        """发布消息"""
        self.channel.publish(exchange=exchange, routing_key=queue_name, body=msg,
                             properties=pika.BasicProperties(delivery_mode=2))
        print('发送消息:', msg)

    def consume(self, callback, queue_name):
        """消费消息"""
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(callback, queue=queue_name)
        self.channel.start_consuming()

    def close_connect(self):
        self.client.close()



from cache import basemq
from cache import conf


class Pubmq(basemq.AdminMQ):
    """订阅发布"""
    def __init__(self, conf):
        super().__init__(conf)


mq = Pubmq(conf.CONF)
mq.make_exchange(exchange='exch')
msg = 'hello dongli'
mq.publish(exchange='exch', queue_name='', msg=msg)

from cache import basemq
from cache import conf


class Submq(basemq.AdminMQ):
    """订阅发布"""
    def __init__(self, conf):
        super().__init__(conf)

    def callback(self, ch, method, properties, body):
        print(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)


mq = Submq(conf.CONF)
mq.make_exchange(exchange='exch')
q_name = mq.random_queue()
mq.bind_queue(q_name, 'exch')
mq.consume(mq.callback, queue_name=q_name)

RPC 通信

 

import uuid
import pika
from cache import basemq
from cache import conf


class Rpc_client(basemq.AdminMQ):
    """RPC"""

    def __init__(self, config):
        """
            result 为接收到的结果
            corr_id  为验证id
            quque为发布和消费的队列

        """
        super().__init__(config)
        self.result = None
        self.corr_id = str(uuid.uuid4())
        self.queue = self.random_queue()
        self.channel.basic_consume(self.callback, queue=self.queue)

    def callback(self, ch, method, properties, body):
        """如果接受到的id和本身id相同,证明rpc正确"""
        if self.corr_id == properties.correlation_id:
            self.result = body
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def call(self, r):
        """调用RPC的方法 通过reply_to, correlation_id, routing_key 和消费端相同"""
        self.channel.publish(exchange='', routing_key='rpc',
                             properties=pika.BasicProperties(
                                 reply_to=self.queue, correlation_id=str(self.corr_id)),
                             body=str(r))

        while self.result is None:
            """ 事件循环"""
            self.client.process_data_events()

        return self.result


if __name__ == '__main__':
    rpc_client = Rpc_client(conf.CONF)
    res = rpc_client.call(5)
    print(res)

import pika
from cache import basemq
from cache import conf


class Rpc_server(basemq.AdminMQ):
    """RPC服务端"""

    def __init__(self, config):
        super().__init__(config)

    def calc(self, r):
        """计算函数"""
        return r * r

    def callback(self, ch, method, properties, body):
        """通过callback ch.basic_publis 返回结果"""
        res = self.calc(float(body))
        ch.basic_publish(exchange='', routing_key=properties.reply_to,
                         properties=pika.BasicProperties(correlation_id=str(properties.correlation_id)),
                         body=str(res))
        ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == '__main__':
    rpc_server = Rpc_server(conf.CONF)
    rpc_server.make_queue('rpc')
    rpc_server.consume(rpc_server.callback, queue_name='rpc')

分布式任务队列 Celery

异步实时处理
支持扩展
支持多种消息协议
使用amqp协议

应用场景

后台任务
异步操作
定期执行工作
一定程度上的分布式计算

核心关键词
task queue 任务队列
task 任务 工作单元
broker 消息代理
worker 工作者,负责执行任务

publish -》broker -》worker -》 result store

安装

pip install celery

-发送任务
-定时执行任务
-多台机器并发工作

vim 配置文件
vim ~/.vimrc
set ts=4
set expandtab
set nu

--------task.py 创建task----------------

improt time
from celery import Celery

app = Celery('task',broker='amqp://', backend='redis://192.168.66.189:6666')

@app.task
def worker(name):
print(name)
time.sleep(2)
return name

redis 格式: redis://:password@hostname:port/db-name
amqp 格式: amqp:// user:pwd@hostname:port/vhost

启动服务
celery worker -A task -l info

手动执行:
from task import worker
w = worker.delay('dongli') 执行
w.ready() 判断执行完成
w.get() 获取结果 timeout = 2

脚本执行:

1 from task import worker
2
3 def excute(name):
4 w = worker.delay(name)
5 while not w.ready():
6 pass
7 res = w.get()
8 print(res)
9 return res
10
11 if __name__ == '__main__':
12
13 excute()

vim操作
* yyp 复制
* dw删除

定时执行:
新建config.py
1 from datetime import timedelta
2
3
4 CELERYBEAT_SCHEDULE={
5 'task-15s':{
6 'task':'mytask.worker', # filename.funname
7 'schedule':timedelta(seconds=15),
8 'args':('dongli',)
9 }
10 }

任务里面加 app.config_from_object('配置文件名')
执行 celery beat -A mytask -l info -D # -D 后台执行

ps aux | grep celery | grep -v grep | awk '{print$2}'| xargs kill 杀掉后台进程

crontab 定时

Crontab计划
如果您想要更好地控制任务执行的时间,例如,特定时间或一周中的某天,您可以使用crontab计划类型:

from celery.schedules import crontab

CELERY_TIMEZONE = 'Asia/Shanghai'
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}

分布式执行

多台服务器新建任务
redis 配置 /bind /protect /requirepass


最后更新于:2018-10-17 15:58:45