我正在使用
RSA
算法来加密数据。前端使用 window.crypto
,后端使用 pycryptodome
。
但是,我发现前端加密payload后,后端无法解密。前端参考了
MDN
的示例代码:https://github.com/mdn/dom-examples/blob/main/web-crypto/import-key/spki.js,我的代码没有显示在现在是 GitHub。后端是从我自己编写的示例中复制的:https://github.com/zvms/rsa-bcrypt-jwt-login-eg/blob/main/cert.py。完整代码如下所示:
crpyto.ts
// Function to convert PEM formatted public key to a CryptoKey object
async function importPublicKey(pemKey: string) {
const withoutNewlines = pemKey
.replace('-----BEGIN PUBLIC KEY-----', '')
.replace('-----END PUBLIC KEY-----', '')
.split('\n')
.filter((line) => line.trim() !== '')
.join('')
console.log(withoutNewlines)
// Base64 decode the string to get the binary data
const binaryDerString = window.atob(withoutNewlines)
// Convert from a binary string to an ArrayBuffer
const binaryDer = str2ab(binaryDerString)
return window.crypto.subtle.importKey(
'spki',
binaryDer,
{
name: 'RSA-OAEP',
hash: 'SHA-256' // Specify the hash algorithm
},
true,
['encrypt']
)
}
// Utility function to convert a binary string to an ArrayBuffer
function str2ab(str: string) {
const buf = new ArrayBuffer(str.length)
const bufView = new Uint8Array(buf)
for (let i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i)
}
return buf
}
// Function to encrypt data using RSA-OAEP
async function encryptData(publicKey: CryptoKey, data: string) {
const encoder = new TextEncoder()
const encodedData = encoder.encode(data)
const encryptedData = await window.crypto.subtle.encrypt(
{
name: 'RSA-OAEP'
},
publicKey,
encodedData
)
return encryptedData
}
export { importPublicKey, encryptData }
auth.ts
async function UserLogin(user: string, password: string, term: 'long' | 'short' = 'long') {
const payload = JSON.stringify({
password: password,
time: Date.now()
})
const publicKey = await importPublicKey(await getRSAPublicCert())
const credential = await encryptData(publicKey, payload)
// console.log(credential)
const hex = byteArrayToHex(new Uint8Array(credential))
console.log(`User ${user} login with ${term} term, with the credential ${hex}`)
const result = (await axios('/user/auth', {
method: 'POST',
data: {
id: user.toString(),
credential: hex,
mode: term
}
})) as Response<LoginResult>
}
import bcrypt
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from fastapi import HTTPException
import jwt
import json
import datetime
from database import db
from bson import ObjectId
from bcrypt import checkpw
class Auth:
password: str
time: int
def hash_password(password):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
def check_password(password, hashed):
return bcrypt.checkpw(password.encode("utf-8"), hashed)
public_key = RSA.import_key(open("rsa_public_key.pem", "rb").read())
private_key = RSA.import_key(open("rsa_private_key.pem", "rb").read())
jwt_private_key = open("aes_key.txt", "r").read()
def rsa_encrypt(plaintext):
cipher = PKCS1_OAEP.new(public_key)
encrypt_text = cipher.encrypt(bytes(plaintext.encode("utf8")))
return encrypt_text.hex()
def rsa_decrypt(ciphertext):
cipher = PKCS1_OAEP.new(private_key)
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
return decrypt_text.decode("utf8")
def jwt_encode(id: str):
payload = {
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=60),
"iat": datetime.datetime.utcnow(),
"sub": id,
"scope": "access_token",
"type": "long-term",
}
return jwt.encode(payload, jwt_private_key, algorithm="HS256")
def jwt_decode(token):
return jwt.decode(token, jwt_private_key, algorithms=["HS256"], verify=True)
def validate_by_cert(id: str, cert: str):
auth_field = json.loads(rsa_decrypt(cert))
time = auth_field["time"]
# in a minute
if time > datetime.datetime.now().timestamp() + 60:
raise HTTPException(status_code=401, detail="Token expired")
if checkpwd(id, auth_field["password"]):
raise HTTPException(status_code=401, detail="Password incorrect")
return jwt_encode(id)
def checkpwd(id: str, pwd: str):
user = db.zvms.users.find_one({"_id": ObjectId(id)})
if checkpw(bytes(pwd, 'utf-8'), user.get('password')):
return True
return False
API 处理程序直接调用
validate_by_cert
方法。
直接使用
openssl
。
openssl genrsa -out rsa_private_key.pem 1024
openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem
openssl rand -hex 32 > aes_key.txt
调用此函数时返回
error
api
:
File "/Users/*/**/zvms/routers/users_router.py", line 43, in auth_user
result = validate_by_cert(id, credential)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 59, in validate_by_cert
auth_field = json.loads(rsa_decrypt(cert))
^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 39, in rsa_decrypt
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/anaconda3/envs/zvms/lib/python3.12/site-packages/Crypto/Cipher/PKCS1_OAEP.py", line 191, in decrypt
raise ValueError("Incorrect decryption.")
但是,当我调用原生
rsa_encrypt
方法并使用它来解密时,效果很好。
前端和后端可以成功传输加密数据。
Python 堆栈跟踪显示错误是在
rsa_decrypt()
中引起的。
RSA 解密失败,因为在 Python/PyCryptodome 代码中,默认值(即 SHA-1)被用作(OAEP 和 MGF1)摘要,请参阅
Crypto.Cipher.PKCS1_OAEP.new()
,hashAlgo
参数的描述,而在 JavaScript/应用 WebCrypto 代码 SHA-256(s. window.crypto.subtle.importKey()
调用)。
要在 Python 端也使用 SHA-256,请修复
rsa_decrypt()
:
from Crypto.Hash import SHA256
...
cipher = PKCS1_OAEP.new(private_key, hashAlgo = SHA256)
通过此更改,解密对我有用。