赞
踩
python3(本次用的3.8)、postman、requests、Flask,pip,pipenv等工具
先安装一个环境
pip install pipenv
pipenv使用
创建环境
pipenv install
会生成一个pipfile文件,用于管理库的依赖
在虚拟环境中安装依赖
pipenv install flask==2.0.2
pipenv install requests==2.18.4
安装成功后可看到pipfile中看到
启动虚拟环境
pipenv shell
新建一个blockchain.py 开始撸代码
{
"index":0, // 块序号
"timestamp":"",// 时间戳
"transactions":""[ // 交易信息
{
"sender":"",
"recipient":"",
"amount":5,
}
],
"proof":"", // 工作量证明
"previous_hash":"",// 前区块的哈希值
}
class Blockchain: def __init__(self): self.chain = [] # 存块 self.current_transactions = [] # 交易实体 def new_block(self): # 新建区块 pass def new_transaction(self):# 新建交易 pass @staticmethod def hash(block):# 计算哈希值 pass @property def last_block(self):# 获取当前链中最后一个区块 pass def proof_of_work(self):# 工作量证明计算 pass def vaild_proof(self):# 验证计算值是否符合要求 pass def vaild_chain(self):# 验证链是否符合要求 pass def register_node(self):# 节点注册 pass def resolve_conflicts:# 共识算法,解决冲突 pass
def new_transaction(self, sender: str, recipient: str, amount: int) -> int: """添加新的交易 Args: sender (str): 发送方 recipient (str): 接收方 amount (int): 金额 Returns: int: 返回一个包含此交易的区块序号 """ self.current_transactions.append({ 'sender': sender, 'recipient': recipient, 'amount': amount }) return self.last_block['index'] + 1
def new_block(self, proof: int, previous_hash=None): # 新建区块
block = {
'index': len(self.chain) + 1,
'timestamp': time(),
'transactions': self.current_transactions,
'proof': proof,
'previous_hash': previous_hash or self.hash(self.last_block),
}
self.current_transactions = [] # 新建区块打包后重置当前交易信息
self.chain.append(block) # 把新建的区块加入链
return block
@staticmethod
def hash(block: Dict[str, Any]) -> str:
"""计算哈希值,返回哈希后的摘要信息
Args:
block (Dict[str, Any]): 传入一个块
Returns:
str: 摘要信息
"""
block_string = json.dumps(block, sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()
def proof_of_work(self, last_proof: int) -> int: """工作量计算,计算一个符合要求的哈希值 Args: last_proof (int): 上一个块的工作量随机数 Returns: int: 返回符合要求的工作量随机数 """ proof = 0 while self.valid_proof(last_proof, proof) is False: proof += 1 # print(proof) 输出计算结果 return proof def valid_proof(self, last_proof: int, proof: int) -> bool: """工作量证明验证,验证计算结果是否以2个0开头 Args: last_proof (int): 前工作证明 proof (int): 当前工作证明 Returns: bool: 返回验证是否有效 """ guess = f'{last_proof}{proof}'.encode() guess_hash = hashlib.sha256(guess).hexdigest() # print(guess_hash) 输出计算过程 if guess_hash[0:2] == "00": return True else: return False
验证一下,创建一个类,设定前一个工作量证明为100
尝试运行代码
可以看到这里算出前两位为0的就停止,结果为226
def register_node(self, address: str) -> None:
"""添加一个新节点到节点集中
Args:
address (str): 节点的地址。Eg:"http://127.0.0.1:5002"
"""
parsed_url = urlparse(address) # 解析url参数
self.nodes.add(parsed_url.netloc) # 获取域名服务器
def resolve_conflicts(self) -> bool: """共识算法,解决冲突,以最长且有效的链为主 Returns: bool: 冲突是否解决成功 """ neighbours = self.nodes # 获取节点信息 new_chain = None # 定义可能的新链 max_length = len(self.chain) # 获取当前链长度 for node in neighbours: # 获取节点的链条信息,如果更长且有效则直接替换 response = requests.get(f'http://{node}/chain') if response.status_code == 200: length = response.json()['length'] chain = response.json()['chain'] if length > max_length and self.vaild_chain(chain): max_length = length new_chain = chain if new_chain: self.chain = new_chain return True return False
使用flask 部署服务器
app = Flask(__name__)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)
尝试运行代码
从postman 中可看到一些信息,目前没有定义接口,自然是404
@app.route('/transactions/new', methods=['POST']) def new_transaction(): pass @app.route('/mine', methods=['GET']) def mine(): pass @app.route('/chain', methods=['GET']) def full_chain(): pass @app.route('/nodes/register', methods=['POST']) def register_nodes(): pass @app.route('/nodes/resolve', methods=['GET']) def consensus(): pass
# 添加新建交易接口
@app.route('/transactions/new', methods=['POST'])
def new_transaction():
values = request.get_json()
required = ['sender', 'recipient', 'amount']
if not all(k in values for k in required):
return '确少参数', 400
index = blockchain.new_transaction(values['sender'], values['recipient'],
values['amount'])
response = {'message': f'交易将会被添加到块 {index}'}
return jsonify(response), 201
# 查看链接口
@app.route('/chain', methods=['GET'])
def full_chain():
response = {
'chain': blockchain.chain,
'length': len(blockchain.chain),
}
return jsonify(response), 200
@app.route('/mine', methods=['GET']) def mine(): last_block = blockchain.last_block # 获取链上最后一个区块的信息 last_proof = last_block['proof'] proof = blockchain.proof_of_work(last_proof) # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励 blockchain.new_transaction( sender="0", recipient=node_identifier, amount=1, ) block = blockchain.new_block(proof, None) # 生成一个新块 response = { 'message': "打包成功,新区块已生成!", 'index': block['index'], 'transactions': block['transactions'], 'proof': block['proof'], 'previous_hash': block['previous_hash'], } return jsonify(response), 200
去postman 依次请求chain(查看当前链)、transactions/new(新建交易,注意是post方法提交的json数据)、mine(打包区块)、chain
这时,一个单节点的区块链流程基本实现。
# 节点注册接口 @app.route('/nodes/register', methods=['POST']) def register_nodes(): values = request.get_json() nodes = values.get('nodes') if nodes is None: return "Error: 请提供一个符合规则的节点", 400 for node in nodes: blockchain.register_node(node) response = { 'message': '新节点已经被添加!', 'total_nodes': list(blockchain.nodes), } return jsonify(response), 201
测试一下
# 共识接口
@app.route('/nodes/resolve', methods=['GET'])
def consensus():
replaced = blockchain.resolve_conflicts()
if replaced:
response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain}
else:
response = {'message': '当前链符合要求', 'chain': blockchain.chain}
return jsonify(response), 200
import hashlib import json from time import time from urllib.parse import urlparse # url解析 from uuid import uuid4 # 生成唯一id from flask import Flask, jsonify, request from typing import Any, Dict, List import requests from argparse import ArgumentParser # 命令行参数解析 class Blockchain: def __init__(self): self.chain = [] # 存块 self.current_transactions = [] # 交易实体 self.nodes = set() # 无重复的节点集合 # 创建创世区块 self.new_block(previous_hash='1', proof=100) def register_node(self, address: str) -> None: """添加一个新节点到节点集中 Args: address (str): 节点的地址。Eg:"http://127.0.0.1:5002" """ parsed_url = urlparse(address) # 解析url参数 self.nodes.add(parsed_url.netloc) # 获取域名服务器 def new_block(self, proof: int, previous_hash=None): # 新建区块 block = { 'index': len(self.chain) + 1, 'timestamp': time(), 'transactions': self.current_transactions, 'proof': proof, 'previous_hash': previous_hash or self.hash(self.last_block), } self.current_transactions = [] # 新建区块打包后重置当前交易信息 self.chain.append(block) # 把新建的区块加入链 return block def new_transaction(self, sender: str, recipient: str, amount: int) -> int: """添加新的交易 Args: sender (str): 发送方 recipient (str): 接收方 amount (int): 金额 Returns: int: 返回一个包含此交易的区块序号 """ self.current_transactions.append({ 'sender': sender, 'recipient': recipient, 'amount': amount }) return self.last_block['index'] + 1 @staticmethod def hash(block: Dict[str, Any]) -> str: """计算哈希值,返回哈希后的摘要信息 Args: block (Dict[str, Any]): 传入一个块 Returns: str: 摘要信息 """ block_string = json.dumps(block, sort_keys=True).encode() return hashlib.sha256(block_string).hexdigest() @property def last_block(self) -> Dict[str, Any]: # 获取当前链中最后一个区块 return self.chain[-1] def proof_of_work(self, last_proof: int) -> int: """工作量计算,计算一个符合要求的哈希值 Args: last_proof (int): 上一个块的工作量随机数 Returns: int: 返回符合要求的工作量随机数 """ proof = 0 while self.valid_proof(last_proof, proof) is False: proof += 1 # print(proof) 输出计算结果 return proof def vaild_chain(self, chain: List[Dict[str, Any]]) -> bool: """验证链是否合理:最长且有效 Args: chain (List[Dict[str, Any]]): 传入链 Returns: bool: 返回是否有效 """ last_block = chain[0] # 从第一个创世区块开始遍历验证 current_index = 1 while current_index < len(chain): block = chain[current_index] print(f'{last_block}') print(f'{block}') print("\n-----------\n") # 如果当前区块的前哈希和前一个计算出来的哈希值不同则是无效链 if block['previous_hash'] != self.hash(last_block): return False # 检验工作量证明是否符合要求 if not self.valid_proof(last_block['proof'], block['proof']): return False last_block = block current_index += 1 return True def valid_proof(self, last_proof: int, proof: int) -> bool: """工作量证明验证,验证计算结果是否以2个0开头 Args: last_proof (int): 前工作证明 proof (int): 当前工作证明 Returns: bool: 返回验证是否有效 """ guess = f'{last_proof}{proof}'.encode() guess_hash = hashlib.sha256(guess).hexdigest() # print(guess_hash) 输出计算过程 if guess_hash[0:2] == "00": return True else: return False def resolve_conflicts(self) -> bool: """共识算法,解决冲突,以最长且有效的链为主 Returns: bool: 冲突是否解决成功 """ neighbours = self.nodes # 获取节点信息 new_chain = None # 定义可能的新链 max_length = len(self.chain) # 获取当前链长度 for node in neighbours: # 获取节点的链条信息,如果更长且有效则直接替换 response = requests.get(f'http://{node}/chain') if response.status_code == 200: length = response.json()['length'] chain = response.json()['chain'] if length > max_length and self.vaild_chain(chain): max_length = length new_chain = chain if new_chain: self.chain = new_chain return True return False app = Flask(__name__) # flask框架 node_identifier = str(uuid4()).replace('-', '') # 使者获取一个唯一的uid blockchain = Blockchain() # 添加新建交易接口 @app.route('/transactions/new', methods=['POST']) def new_transaction(): values = request.get_json() required = ['sender', 'recipient', 'amount'] if not all(k in values for k in required): return '确少参数', 400 index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount']) response = {'message': f'交易将会被添加到块 {index}'} return jsonify(response), 201 # 添加新建打包区块接口 @app.route('/mine', methods=['GET']) def mine(): last_block = blockchain.last_block # 获取链上最后一个区块的信息 last_proof = last_block['proof'] proof = blockchain.proof_of_work(last_proof) # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励 blockchain.new_transaction( sender="0", recipient=node_identifier, amount=1, ) block = blockchain.new_block(proof, None) # 生成一个新块 response = { 'message': "打包成功,新区块已生成!", 'index': block['index'], 'transactions': block['transactions'], 'proof': block['proof'], 'previous_hash': block['previous_hash'], } return jsonify(response), 200 # 查看链接口 @app.route('/chain', methods=['GET']) def full_chain(): response = { 'chain': blockchain.chain, 'length': len(blockchain.chain), } return jsonify(response), 200 # 节点注册接口 @app.route('/nodes/register', methods=['POST']) def register_nodes(): values = request.get_json() nodes = values.get('nodes') if nodes is None: return "Error: 请提供一个符合规则的节点", 400 for node in nodes: blockchain.register_node(node) response = { 'message': '新节点已经被添加!', 'total_nodes': list(blockchain.nodes), } return jsonify(response), 201 # 共识接口 @app.route('/nodes/resolve', methods=['GET']) def consensus(): replaced = blockchain.resolve_conflicts() if replaced: response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain} else: response = {'message': '当前链符合要求', 'chain': blockchain.chain} return jsonify(response), 200 if __name__ == "__main__": parser = ArgumentParser() # 命令行参数解析,端口默认5000 parser.add_argument('-p', '--port', default=5000, type=int, help='port to listen on') args = parser.parse_args() port = args.port app.run(host='127.0.0.1', port=port) # 启动web服务,默认本机
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。