commodity_backingtrack_system/app/blockchain/clients/client3/blockchain.py

201 lines
6.1 KiB
Python
Raw Permalink Normal View History

2021-03-11 13:09:43 +08:00
import hashlib
import json
from typing import Dict, Any
class BlockChain:
def __init__(self):
self.chain = []
print("正在从文件中加载区块链信息...")
chain_json = self.load_file()
if chain_json["length"] == 0 or self.valid_chain(chain_json["chain"]) is False:
# 创建创世块
print("文件验证失败,即将生成新的区块链")
self.new_block({}, proof=100)
else:
print("文件验证成功")
self.chain = chain_json["chain"]
def load_file(self):
try:
with open("blockchain.json", "r", encoding="utf-8") as fr:
data = json.load(fr)
return data
except FileNotFoundError as e:
self.update_file()
with open("blockchain.json", "r", encoding="utf-8") as fr:
data = json.load(fr)
return data
def update_file(self):
with open("blockchain.json", "w", encoding="utf-8") as fw:
json.dump(self.full_chain(), fw, indent=4)
print("保存成功")
def valid_chain(self, chain) -> bool:
"""
验证区块链的信息是否正确
:param chain: 一个区块链
:return: 正确返回True否则返回False
"""
last_block = chain[0]
current_index = 1
while current_index < len(chain):
block = chain[current_index]
# check that the hash of the block is correct
if block['previous_hash'] != self.hash(last_block):
print("前置哈希验证失败")
return False
# Check that the hash of Work is correct
if not self.valid_proof(current_index + 1, block["content"], block["proof"], block["previous_hash"]):
print("工作量证明验证失败")
return False
last_block = block
current_index += 1
print("区块链校验通过")
return True
def new_block(self, data, proof, index=None) -> Dict[str, Any]:
"""
生成新块
:param data: 区块中保存的数据
:param proof: 计算得到的工作量证明
:param index: 新区块的索引
:return: 新的区块
"""
if index is None:
index = len(self.chain) + 1
previous_hash = "0000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
if self.last_block:
previous_hash = self.hash(self.last_block)
block = {
"index": index,
"content": data,
"proof": proof,
"previous_hash": previous_hash
}
temp_chain = list(self.chain[0:index])
temp_chain.append(block)
if self.valid_chain(temp_chain):
self.chain = list(temp_chain)
print("新区块创建并验证成功,正在保存")
self.update_file()
return block
@staticmethod
def new_data(op_id, operator, option, commodity, time):
"""
生成新操作信息信息将加入到下一个待挖的区块中
:param time: 操作时间
:param op_id: 操作id
:param operator: 商品的操作者
:param option: 操作类型
:param commodity: 被操作的商品
:return: 创建好的操作信息
"""
data = (
{
"id": op_id,
"operator": operator,
"option": option,
"commodity": commodity,
"time": time
}
)
return data
@staticmethod
def hash(block) -> str:
"""
生成块的 SHA-256 hash值
:param block: Block
:return: 哈希值
"""
# We must make sure that the Dictionary is Ordered, or we'll have inconsistent hashes
block_string = json.dumps(block, sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()
@property
def last_block(self):
if len(self.chain) >= 1:
return self.chain[-1]
else:
return None
def proof_of_work(self, block_data) -> int:
"""
简单的工作量证明
- 查找一个 p' 使得 hash(pp') 以4个0开头
- p 是上一个块的证明p' 是当前的证明
:param block_data: 上一个证明
:return: 查找到的证明
"""
guss_proof = 0
while self.valid_proof(len(self.chain) + 1, block_data, guss_proof) is False:
guss_proof += 1
if guss_proof % 1000 == 0:
print(guss_proof, end=",")
if guss_proof % 10000 == 0:
print("\n")
return guss_proof
def valid_proof(self, index, data, proof: int, previous_hash=None) -> bool:
"""
验证证明: 是否hash(last_proof, proof)以4个0开头
:param previous_hash:
:param index:
:param data: 前一个区块的哈希值
:param proof: Current Proof
:return: True if correct, False if not.
"""
if previous_hash is None:
previous_hash = self.hash(self.last_block)
block = {
"index": index,
"content": data,
"proof": proof,
"previous_hash": previous_hash
}
guess_hash = self.hash(block)
# print(guess_hash)
if guess_hash.startswith("0000"):
return True
else:
return False
def full_chain(self):
data = {
'chain': self.chain,
'length': len(self.chain)
}
return data
def create_chain():
op_data = chain.new_data(2, "操作者", "操作类型", "商品", "time")
op_data = [chain.new_data(2, "操作者", "操作类型", "商品", "time"), chain.new_data(2, "操作者", "操作类型", "商品", "time"),
chain.new_data(2, "操作者", "操作类型", "商品", "time")]
new_proof = chain.proof_of_work(op_data)
chain.new_block(data=op_data, proof=2321084, index=4)
if __name__ == '__main__':
chain = BlockChain()
create_chain()
temp = json.dumps(chain.full_chain())
print(temp)