commodity_backingtrack_system/app/blockchain/clients/client3/blockchain.py

201 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import json
from typing import Dict, Any
class BlockChain:
def __init__(self):
self.chain = []
print("正在从文件中加载区块链信息...")
chain_json = self.load_file()
if chain_json["length"] == 0 or self.valid_chain(chain_json["chain"]) is False:
# 创建创世块
print("文件验证失败,即将生成新的区块链")
self.new_block({}, proof=100)
else:
print("文件验证成功")
self.chain = chain_json["chain"]
def load_file(self):
try:
with open("blockchain.json", "r", encoding="utf-8") as fr:
data = json.load(fr)
return data
except FileNotFoundError as e:
self.update_file()
with open("blockchain.json", "r", encoding="utf-8") as fr:
data = json.load(fr)
return data
def update_file(self):
with open("blockchain.json", "w", encoding="utf-8") as fw:
json.dump(self.full_chain(), fw, indent=4)
print("保存成功")
def valid_chain(self, chain) -> bool:
"""
验证区块链的信息是否正确
:param chain: 一个区块链
:return: 正确返回True否则返回False
"""
last_block = chain[0]
current_index = 1
while current_index < len(chain):
block = chain[current_index]
# check that the hash of the block is correct
if block['previous_hash'] != self.hash(last_block):
print("前置哈希验证失败")
return False
# Check that the hash of Work is correct
if not self.valid_proof(current_index + 1, block["content"], block["proof"], block["previous_hash"]):
print("工作量证明验证失败")
return False
last_block = block
current_index += 1
print("区块链校验通过")
return True
def new_block(self, data, proof, index=None) -> Dict[str, Any]:
"""
生成新块
:param data: 区块中保存的数据
:param proof: 计算得到的工作量证明
:param index: 新区块的索引
:return: 新的区块
"""
if index is None:
index = len(self.chain) + 1
previous_hash = "0000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
if self.last_block:
previous_hash = self.hash(self.last_block)
block = {
"index": index,
"content": data,
"proof": proof,
"previous_hash": previous_hash
}
temp_chain = list(self.chain[0:index])
temp_chain.append(block)
if self.valid_chain(temp_chain):
self.chain = list(temp_chain)
print("新区块创建并验证成功,正在保存")
self.update_file()
return block
@staticmethod
def new_data(op_id, operator, option, commodity, time):
"""
生成新操作信息,信息将加入到下一个待挖的区块中
:param time: 操作时间
:param op_id: 操作id
:param operator: 商品的操作者
:param option: 操作类型
:param commodity: 被操作的商品
:return: 创建好的操作信息
"""
data = (
{
"id": op_id,
"operator": operator,
"option": option,
"commodity": commodity,
"time": time
}
)
return data
@staticmethod
def hash(block) -> str:
"""
生成块的 SHA-256 hash值
:param block: Block
:return: 哈希值
"""
# We must make sure that the Dictionary is Ordered, or we'll have inconsistent hashes
block_string = json.dumps(block, sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()
@property
def last_block(self):
if len(self.chain) >= 1:
return self.chain[-1]
else:
return None
def proof_of_work(self, block_data) -> int:
"""
简单的工作量证明:
- 查找一个 p' 使得 hash(pp') 以4个0开头
- p 是上一个块的证明p' 是当前的证明
:param block_data: 上一个证明
:return: 查找到的证明
"""
guss_proof = 0
while self.valid_proof(len(self.chain) + 1, block_data, guss_proof) is False:
guss_proof += 1
if guss_proof % 1000 == 0:
print(guss_proof, end=",")
if guss_proof % 10000 == 0:
print("\n")
return guss_proof
def valid_proof(self, index, data, proof: int, previous_hash=None) -> bool:
"""
验证证明: 是否hash(last_proof, proof)以4个0开头
:param previous_hash:
:param index:
:param data: 前一个区块的哈希值
:param proof: Current Proof
:return: True if correct, False if not.
"""
if previous_hash is None:
previous_hash = self.hash(self.last_block)
block = {
"index": index,
"content": data,
"proof": proof,
"previous_hash": previous_hash
}
guess_hash = self.hash(block)
# print(guess_hash)
if guess_hash.startswith("0000"):
return True
else:
return False
def full_chain(self):
data = {
'chain': self.chain,
'length': len(self.chain)
}
return data
def create_chain():
op_data = chain.new_data(2, "操作者", "操作类型", "商品", "time")
op_data = [chain.new_data(2, "操作者", "操作类型", "商品", "time"), chain.new_data(2, "操作者", "操作类型", "商品", "time"),
chain.new_data(2, "操作者", "操作类型", "商品", "time")]
new_proof = chain.proof_of_work(op_data)
chain.new_block(data=op_data, proof=2321084, index=4)
if __name__ == '__main__':
chain = BlockChain()
create_chain()
temp = json.dumps(chain.full_chain())
print(temp)