哈希算法(Hash Algorithm)是一种将输入数据映射到固定大小的输出(通常是一个整数或字符串)的算法。哈希算法广泛应用于数据结构(如哈希表)、加密、数据校验等领域。下面将详细介绍哈希算法的基本原理,并通过具体案例展示如何在Python中实现和应用哈希算法。
- def simple_hash(data):
- hash_value = 0
- for char in data:
- hash_value += ord(char)
- return hash_value
- # 测试简单哈希函数
- print(simple_hash("hello")) # 输出:532
- print(simple_hash("world")) # 输出:552
- print(simple_hash("hello world")) # 输出:1116
- def improved_hash(data):
- hash_value = 0
- prime = 31
- for char in data:
- hash_value = hash_value * prime + ord(char)
- return hash_value
- # 测试改进哈希函数
- print(improved_hash("hello")) # 输出:99162322
- print(improved_hash("world")) # 输出:113318802
- print(improved_hash("hello world")) # 输出:929490967117471
- class HashTable:
- def __init__(self, size=100):
- self.size = size
- self.table = [None] * size
- def _hash(self, key):
- hash_value = 0
- prime = 31
- for char in key:
- hash_value = hash_value * prime + ord(char)
- return hash_value % self.size
- def insert(self, key, value):
- index = self._hash(key)
- if self.table[index] is None:
- self.table[index] = []
- # 检查是否有相同的key,更新value
- for item in self.table[index]:
- if item[0] == key:
- item[1] = value
- return
- self.table[index].append([key, value])
- def search(self, key):
- index = self._hash(key)
- if self.table[index] is None:
- return None
- for item in self.table[index]:
- if item[0] == key:
- return item[1]
- return None
- def delete(self, key):
- index = self._hash(key)
- if self.table[index] is None:
- return
- for i, item in enumerate(self.table[index]):
- if item[0] == key:
- del self.table[index][i]
- return
- # 测试哈希表
- hash_table = HashTable()
- # 插入键值对
- hash_table.insert("name", "Alice")
- hash_table.insert("age", "25")
- hash_table.insert("city", "New York")
- # 搜索键值
- print(hash_table.search("name")) # 输出:Alice
- print(hash_table.search("age")) # 输出:25
- print(hash_table.search("city")) # 输出:New York
- # 删除键值
- hash_table.delete("age")
- print(hash_table.search("age")) # 输出:None
- import hashlib
- def hash_password(password):
- return hashlib.sha256(password.encode()).hexdigest()
- def verify_password(stored_hash, password):
- return stored_hash == hashlib.sha256(password.encode()).hexdigest()
- # 示例密码
- password = "securepassword"
- hashed_password = hash_password(password)
- print("Hashed Password:", hashed_password)
- # 验证密码
- print(verify_password(hashed_password, "securepassword")) # 输出:True
- print(verify_password(hashed_password, "wrongpassword")) # 输出:False
在哈希表中,多个键被映射到同一个哈希值的位置会导致碰撞。常见的碰撞处理方法有两种:链地址法(Separate Chaining)和开放地址法(Open Addressing)。
- class HashTableChaining:
- def __init__(self, size=100):
- self.size = size
- self.table = [[] for _ in range(size)]
- def _hash(self, key):
- hash_value = 0
- prime = 31
- for char in key:
- hash_value = hash_value * prime + ord(char)
- return hash_value % self.size
- def insert(self, key, value):
- index = self._hash(key)
- for item in self.table[index]:
- if item[0] == key:
- item[1] = value
- return
- self.table[index].append([key, value])
- def search(self, key):
- index = self._hash(key)
- for item in self.table[index]:
- if item[0] == key:
- return item[1]
- return None
- def delete(self, key):
- index = self._hash(key)
- for i, item in enumerate(self.table[index]):
- if item[0] == key:
- del self.table[index][i]
- return
- # 测试链地址法哈希表
- hash_table = HashTableChaining()
- hash_table.insert("name", "Alice")
- hash_table.insert("age", "25")
- hash_table.insert("city", "New York")
- print(hash_table.search("name")) # 输出:Alice
- print(hash_table.search("age")) # 输出:25
- print(hash_table.search("city")) # 输出:New York
- hash_table.delete("age")
- print(hash_table.search("age")) # 输出:None
- class HashTableOpenAddressing:
- def __init__(self, size=100):
- self.size = size
- self.table = [None] * size
- def _hash(self, key):
- hash_value = 0
- prime = 31
- for char in key:
- hash_value = hash_value * prime + ord(char)
- return hash_value % self.size
- def insert(self, key, value):
- index = self._hash(key)
- for i in range(self.size):
- probe_index = (index + i) % self.size
- if self.table[probe_index] is None or self.table[probe_index][0] == key:
- self.table[probe_index] = (key, value)
- return
- raise Exception("HashTable is full")
- def search(self, key):
- index = self._hash(key)
- for i in range(self.size):
- probe_index = (index + i) % self.size
- if self.table[probe_index] is None:
- return None
- if self.table[probe_index][0] == key:
- return self.table[probe_index][1]
- return None
- def delete(self, key):
- index = self._hash(key)
- for i in range(self.size):
- probe_index = (index + i) % self.size
- if self.table[probe_index] is None:
- return
- if self.table[probe_index][0] == key:
- self.table[probe_index] = None
- return
- # 测试开放地址法哈希表
- hash_table = HashTableOpenAddressing()
- hash_table.insert("name", "Alice")
- hash_table.insert("age", "25")
- hash_table.insert("city", "New York")
- print(hash_table.search("name")) # 输出:Alice
- print(hash_table.search("age")) # 输出:25
- print(hash_table.search("city")) # 输出:New York
- hash_table.delete("age")
- print(hash_table.search("age")) # 输出:None
- from bitarray import bitarray
- import mmh3
- class BloomFilter:
- def __init__(self, size, hash_count):
- self.size = size
- self.hash_count = hash_count
- self.bit_array = bitarray(size)
- self.bit_array.setall(0)
- def add(self, item):
- for i in range(self.hash_count):
- digest = mmh3.hash(item, i) % self.size
- self.bit_array[digest] = 1
- def check(self, item):
- for i in range(self.hash_count):
- digest = mmh3.hash(item, i) % self.size
- if self.bit_array[digest] == 0:
- return False
- return True
- # 测试布隆过滤器
- bf = BloomFilter(500, 7)
- items_to_add = ["apple", "banana", "cherry", "date"]
- items_to_check = ["apple", "banana", "grape", "orange"]
- for item in items_to_add:
- bf.add(item)
- for item in items_to_check:
- print(f"{item}: {bf.check(item)}")
哈希函数在区块链中有重要应用,尤其是在数据完整性、工作量证明(Proof of Work)以及生成区块哈希等方面。区块链通过哈希函数确保数据不可篡改。
- import hashlib
- import json
- from time import time
- class Block:
- def __init__(self, index, previous_hash, transactions, proof, timestamp=None):
- self.index = index
- self.previous_hash = previous_hash
- self.transactions = transactions
- self.proof = proof
- self.timestamp = timestamp or time()
- def compute_hash(self):
- block_string = json.dumps(self.__dict__, sort_keys=True)
- return hashlib.sha256(block_string.encode()).hexdigest()
- class Blockchain:
- def __init__(self):
- self.chain = []
- self.current_transactions = []
- self.create_block(proof=1, previous_hash='0')
- def create_block(self, proof, previous_hash):
- block = Block(index=len(self.chain) + 1,
- previous_hash=previous_hash,
- transactions=self.current_transactions,
- proof=proof)
- self.current_transactions = []
- self.chain.append(block)
- return block
- def get_last_block(self):
- return self.chain[-1]
- def add_transaction(self, sender, recipient, amount):
- self.current_transactions.append({
- 'sender': sender,
- 'recipient': recipient,
- 'amount': amount
- })
- return self.get_last_block().index + 1
- def proof_of_work(self, last_proof):
- proof = 0
- while not self.valid_proof(last_proof, proof):
- proof += 1
- return proof
- def valid_proof(self, last_proof, proof):
- guess = f'{last_proof}{proof}'.encode()
- guess_hash = hashlib.sha256(guess).hexdigest()
- return guess_hash[:4] == "0000"
- def add_block(self, proof):
- previous_hash = self.get_last_block().compute_hash()
- block = self.create_block(proof, previous_hash)
- return block
- # 创建区块链并添加区块
- blockchain = Blockchain()
- blockchain.add_transaction(sender="Alice", recipient="Bob", amount=50)
- last_proof = blockchain.get_last_block().proof
- proof = blockchain.proof_of_work(last_proof)
- blockchain.add_block(proof)
- for block in blockchain.chain:
- print(f"Block {block.index}: {block.compute_hash()}")
一致性哈希(Consistent Hashing)是一种分布式系统中的哈希技术,用于在分布式节点间均匀分配数据,特别适合动态添加或删除节点的场景。
- import hashlib
- import bisect
- class ConsistentHash:
- def __init__(self, nodes=None, replicas=3):
- self.replicas = replicas
- self.ring = dict()
- self.sorted_keys = []
- if nodes:
- for node in nodes:
- self.add_node(node)
- def _hash(self, key):
- return int(hashlib.md5(key.encode()).hexdigest(), 16)
- def add_node(self, node):
- for i in range(self.replicas):
- key = self._hash(f'{node}:{i}')
- self.ring[key] = node
- bisect.insort(self.sorted_keys, key)
- def remove_node(self, node):
- for i in range(self.replicas):
- key = self._hash(f'{node}:{i}')
- if key in self.ring:
- del self.ring[key]
- self.sorted_keys.remove(key)
- def get_node(self, key):
- if not self.ring:
- return None
- hash_key = self._hash(key)
- idx = bisect.bisect(self.sorted_keys, hash_key)
- if idx == len(self.sorted_keys):
- idx = 0
- return self.ring[self.sorted_keys[idx]]
- # 测试一致性哈希
- nodes = ["node1", "node2", "node3"]
- ch = ConsistentHash(nodes)
- print("Node for key 'my_key1':", ch.get_node("my_key1"))
- print("Node for key 'my_key2':", ch.get_node("my_key2"))
- print("Node for key 'my_key3':", ch.get_node("my_key3"))
- # 添加节点
- ch.add_node("node4")
- print("Node for key 'my_key1' after adding node4:", ch.get_node("my_key1"))
- # 移除节点
- ch.remove_node("node2")
- print("Node for key 'my_key2' after removing node2:", ch.get_node("my_key2"))
- import numpy as np
- class LSH:
- def __init__(self, input_dim, num_tables, num_hashes):
- self.num_tables = num_tables
- self.num_hashes = num_hashes
- self.hash_tables = [{} for _ in range(num_tables)]
- self.random_vectors = [np.random.randn(num_hashes, input_dim) for _ in range(num_tables)]
- def _hash(self, x, random_vectors):
- return tuple((np.dot(random_vectors, x) > 0).astype(int))
- def add(self, x, label):
- for table, random_vectors in zip(self.hash_tables, self.random_vectors):
- hash_value = self._hash(x, random_vectors)
- if hash_value not in table:
- table[hash_value] = []
- table[hash_value].append(label)
- def query(self, x):
- results = set()
- for table, random_vectors in zip(self.hash_tables, self.random_vectors):
- hash_value = self._hash(x, random_vectors)
- if hash_value in table:
- results.update(table[hash_value])
- return results
- # 测试LSH
- data = np.random.randn(100, 128)
- labels = np.arange(100)
- lsh = LSH(input_dim=128, num_tables=5, num_hashes=10)
- for x, label in zip(data, labels):
- lsh.add(x, label)
- query = data[0]
- print("Similar items to query:", lsh.query(query))
- import hashlib
- class HashChain:
- def __init__(self):
- self.chain = []
- def add_block(self, data):
- previous_hash = self.chain[-1]['hash'] if self.chain else '0'
- block_hash = self._hash(data + previous_hash)
- self.chain.append({'data': data, 'hash': block_hash})
- def _hash(self, data):
- return hashlib.sha256(data.encode()).hexdigest()
- def verify_chain(self):
- for i in range(1, len(self.chain)):
- previous_hash = self.chain[i - 1]['hash']
- current_data = self.chain[i]['data']
- if self._hash(current_data + previous_hash) != self.chain[i]['hash']:
- return False
- return True
- # 测试哈希链
- hash_chain = HashChain()
- hash_chain.add_block("Block 1")
- hash_chain.add_block("Block 2")
- hash_chain.add_block("Block 3")
- print("Hash Chain:", hash_chain.chain)
- print("Is chain valid?", hash_chain.verify_chain())
- # 篡改数据
- hash_chain.chain[1]['data'] = "Tampered Block 2"
- print("Is chain valid after tampering?", hash_chain.verify_chain())
