201 lines
6.1 KiB
Python
201 lines
6.1 KiB
Python
import hashlib
|
||
import json
|
||
from typing import Dict, Any
|
||
|
||
|
||
class BlockChain:
|
||
def __init__(self):
|
||
self.chain = []
|
||
print("正在从文件中加载区块链信息...")
|
||
chain_json = self.load_file()
|
||
if chain_json["length"] == 0 or self.valid_chain(chain_json["chain"]) is False:
|
||
# 创建创世块
|
||
print("文件验证失败,即将生成新的区块链")
|
||
self.new_block({}, proof=100)
|
||
else:
|
||
print("文件验证成功")
|
||
self.chain = chain_json["chain"]
|
||
|
||
def load_file(self):
|
||
try:
|
||
with open("blockchain.json", "r", encoding="utf-8") as fr:
|
||
data = json.load(fr)
|
||
return data
|
||
except FileNotFoundError as e:
|
||
self.update_file()
|
||
with open("blockchain.json", "r", encoding="utf-8") as fr:
|
||
data = json.load(fr)
|
||
return data
|
||
|
||
def update_file(self):
|
||
with open("blockchain.json", "w", encoding="utf-8") as fw:
|
||
json.dump(self.full_chain(), fw, indent=4)
|
||
print("保存成功")
|
||
|
||
def valid_chain(self, chain) -> bool:
|
||
"""
|
||
验证区块链的信息是否正确
|
||
|
||
:param chain: 一个区块链
|
||
:return: 正确返回True,否则返回False
|
||
"""
|
||
|
||
last_block = chain[0]
|
||
current_index = 1
|
||
|
||
while current_index < len(chain):
|
||
block = chain[current_index]
|
||
# check that the hash of the block is correct
|
||
if block['previous_hash'] != self.hash(last_block):
|
||
print("前置哈希验证失败")
|
||
return False
|
||
# Check that the hash of Work is correct
|
||
if not self.valid_proof(current_index + 1, block["content"], block["proof"], block["previous_hash"]):
|
||
print("工作量证明验证失败")
|
||
return False
|
||
|
||
last_block = block
|
||
current_index += 1
|
||
print("区块链校验通过")
|
||
return True
|
||
|
||
def new_block(self, data, proof, index=None) -> Dict[str, Any]:
|
||
|
||
"""
|
||
生成新块
|
||
|
||
:param data: 区块中保存的数据
|
||
:param proof: 计算得到的工作量证明
|
||
:param index: 新区块的索引
|
||
:return: 新的区块
|
||
"""
|
||
if index is None:
|
||
index = len(self.chain) + 1
|
||
previous_hash = "0000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||
if self.last_block:
|
||
previous_hash = self.hash(self.last_block)
|
||
|
||
block = {
|
||
"index": index,
|
||
"content": data,
|
||
"proof": proof,
|
||
"previous_hash": previous_hash
|
||
}
|
||
temp_chain = list(self.chain[0:index])
|
||
temp_chain.append(block)
|
||
if self.valid_chain(temp_chain):
|
||
self.chain = list(temp_chain)
|
||
print("新区块创建并验证成功,正在保存")
|
||
self.update_file()
|
||
|
||
return block
|
||
|
||
@staticmethod
|
||
def new_data(op_id, operator, option, commodity, time):
|
||
"""
|
||
生成新操作信息,信息将加入到下一个待挖的区块中
|
||
|
||
:param time: 操作时间
|
||
:param op_id: 操作id
|
||
:param operator: 商品的操作者
|
||
:param option: 操作类型
|
||
:param commodity: 被操作的商品
|
||
:return: 创建好的操作信息
|
||
"""
|
||
|
||
data = (
|
||
{
|
||
"id": op_id,
|
||
"operator": operator,
|
||
"option": option,
|
||
"commodity": commodity,
|
||
"time": time
|
||
}
|
||
)
|
||
return data
|
||
|
||
@staticmethod
|
||
def hash(block) -> str:
|
||
"""
|
||
生成块的 SHA-256 hash值
|
||
|
||
:param block: Block
|
||
:return: 哈希值
|
||
"""
|
||
|
||
# We must make sure that the Dictionary is Ordered, or we'll have inconsistent hashes
|
||
block_string = json.dumps(block, sort_keys=True).encode()
|
||
return hashlib.sha256(block_string).hexdigest()
|
||
|
||
@property
|
||
def last_block(self):
|
||
if len(self.chain) >= 1:
|
||
return self.chain[-1]
|
||
else:
|
||
return None
|
||
|
||
def proof_of_work(self, block_data) -> int:
|
||
"""
|
||
简单的工作量证明:
|
||
- 查找一个 p' 使得 hash(pp') 以4个0开头
|
||
- p 是上一个块的证明,p' 是当前的证明
|
||
:param block_data: 上一个证明
|
||
:return: 查找到的证明
|
||
"""
|
||
guss_proof = 0
|
||
while self.valid_proof(len(self.chain) + 1, block_data, guss_proof) is False:
|
||
guss_proof += 1
|
||
if guss_proof % 1000 == 0:
|
||
print(guss_proof, end=",")
|
||
if guss_proof % 10000 == 0:
|
||
print("\n")
|
||
|
||
return guss_proof
|
||
|
||
def valid_proof(self, index, data, proof: int, previous_hash=None) -> bool:
|
||
"""
|
||
验证证明: 是否hash(last_proof, proof)以4个0开头
|
||
:param previous_hash:
|
||
:param index:
|
||
:param data: 前一个区块的哈希值
|
||
:param proof: Current Proof
|
||
:return: True if correct, False if not.
|
||
"""
|
||
if previous_hash is None:
|
||
previous_hash = self.hash(self.last_block)
|
||
block = {
|
||
"index": index,
|
||
"content": data,
|
||
"proof": proof,
|
||
"previous_hash": previous_hash
|
||
}
|
||
guess_hash = self.hash(block)
|
||
# print(guess_hash)
|
||
if guess_hash.startswith("0000"):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def full_chain(self):
|
||
data = {
|
||
'chain': self.chain,
|
||
'length': len(self.chain)
|
||
}
|
||
return data
|
||
|
||
|
||
def create_chain():
|
||
op_data = chain.new_data(2, "操作者", "操作类型", "商品", "time")
|
||
op_data = [chain.new_data(2, "操作者", "操作类型", "商品", "time"), chain.new_data(2, "操作者", "操作类型", "商品", "time"),
|
||
chain.new_data(2, "操作者", "操作类型", "商品", "time")]
|
||
new_proof = chain.proof_of_work(op_data)
|
||
chain.new_block(data=op_data, proof=2321084, index=4)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
chain = BlockChain()
|
||
|
||
create_chain()
|
||
temp = json.dumps(chain.full_chain())
|
||
print(temp)
|