从java到python的API调用移植(Kostal Plenticore Inverter)

2024-04-23 16:17:05 发布

您现在位置:Python中文网/ 问答频道 /正文


我试图读取传感器数据通过我的逆变器的api。(Kostal PLENTICORE plus) 由于缺少科斯塔的文件,我还没有得到它的工作。身份验证是这里的大问题。但我刚从Openhab找到了代码。
ThirdGenerationEncryptionHelperThirdGenerationHandler
现在我正在尝试将其尽可能轻松地移植到python。
我现在的代码:

    import requests
    import random
    import string
    import json
    import hashlib
    import hmac
    import hashlib
    import binascii

    def randomString(stringLength=10):
        """Generate a random string of fixed length """
        letters = string.ascii_lowercase
        return ''.join(random.choice(letters) for i in range(stringLength))

    def getPBKDF2Hash(password, salt, rounds):
        key = hashlib.pbkdf2_hmac(
        'sha256', # The hash digest algorithm for HMAC
        password.encode('utf-8'), # Convert the password to bytes
        salt, # Provide the salt
        rounds # It is recommended to use at least 100,000 iterations of SHA-256
        )
        return key

    def create_sha256_signature(byte_key, message):
        #byte_key = binascii.unhexlify(key)
        message = message.encode()
        return hmac.new(byte_key, message, hashlib.sha256).hexdigest().upper()

    def createClientProof(clientSignature, serverSignature):
            clientlength = len(clientSignature.encode('utf-8'))
            result = []
            #for i in range(clientlength):
            #    result[i] = (0xff & (bytes(clientSignature[i]) ^ bytes(serverSignature[i])))
            return result**

    username="user"
    password= "A123456789"
    url = 'http://192.168.1.23/api/v1/'
    clientNonce = randomString(16)
    reqstart = {"username": username, "nonce": clientNonce}

    a = requests.post(url+'auth/start', json=reqstart)
    anserstart = json.loads(a.text)

    serverNonce = anserstart['nonce']
    transactionId = anserstart['transactionId']
    salt = anserstart['salt']
    rounds = anserstart['rounds']

    saltedpassword = getPBKDF2Hash(password, salt, rounds)
    clientkey = create_sha256_signature(saltedpassword, "Client Key")
    serverkey = create_sha256_signature(saltedpassword, "Server Key")
    storedKey = hashlib.sha256(clientkey).hexdigest()
    authMessage = "n={},r={},r={},s={},i={},c=biw,r={}"
    authMessage.format(username, clientNonce, serverNonce, salt, rounds, serverNonce)
    clientSignature = create_sha256_signature(storedKey, authMessage)
    serverSignature = create_sha256_signature(storedKey, serverkey)

    print(anserstart)
    #print(saltedpassword)
    #print(clientkey)
    #print(serverkey)
    #print(storedKey)
    print(clientSignature)
    print(serverSignature)
    print(createClientProof(clientSignature,serverSignature))
    #reqfinish = {"proof": "", "transactionId": transactionId}

    #b = requests.post(url+'auth/start', json=reqfinish)
    #answerfinish = json.loads(b.text)
    #print(answerfinish)


现在回答我的问题: 我坚持创建客户机证明(createClientProof函数)。有人能帮我做XOR像它在java中做的那样吗? 除此之外,我对加密或这种身份验证没有太多经验。有人能告诉我我做的工作是否正确吗?你知道吗

原名:

/**
 * This method generates the HMACSha256 encrypted value of the given value
 *
 * @param password       Password used for encryption
 * @param valueToEncrypt value to encrypt
 * @return encrypted value
 * @throws InvalidKeyException      thrown if the key generated from the password is invalid
 * @throws NoSuchAlgorithmException thrown if HMAC SHA 256 is not supported
 */
static byte[] getHMACSha256(byte[] password, String valueToEncrypt)
        throws InvalidKeyException, NoSuchAlgorithmException {
    SecretKeySpec signingKey = new SecretKeySpec(password, HMAC_SHA256_ALGORITHM);
    Mac mac = Mac.getInstance(HMAC_SHA256_ALGORITHM);
    mac.init(signingKey);
    mac.update(valueToEncrypt.getBytes());
    return mac.doFinal();
}

/**
 * This methods generates the client proof.
 * It is calculated as XOR between the {@link clientSignature} and the {@link serverSignature}
 *
 * @param clientSignature client signature
 * @param serverSignature server signature
 * @return client proof
 */
static String createClientProof(byte[] clientSignature, byte[] serverSignature) {
    byte[] result = new byte[clientSignature.length];
    for (int i = 0; i < clientSignature.length; i++) {
        result[i] = (byte) (0xff & (clientSignature[i] ^ serverSignature[i]));
    }
    return Base64.getEncoder().encodeToString(result);
}

/**
 * Create the PBKDF2 hash
 *
 * @param password password
 * @param salt     salt
 * @param rounds   rounds
 * @return hash
 * @throws NoSuchAlgorithmException if PBKDF2WithHmacSHA256 is not supported
 * @throws InvalidKeySpecException  if the key specification is not supported
 */
static byte[] getPBKDF2Hash(String password, byte[] salt, int rounds)
        throws NoSuchAlgorithmException, InvalidKeySpecException {
    PBEKeySpec spec = new PBEKeySpec(password.toCharArray(), salt, rounds, 256);
    SecretKeyFactory skf = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
    return skf.generateSecret(spec).getEncoded();
}

/**
 * Create the SHA256 hash value for the given byte array
 *
 * @param valueToHash byte array to get the hash value for
 * @return the hash value
 * @throws NoSuchAlgorithmException if SHA256 is not supported
 */
static byte[] getSha256Hash(byte[] valueToHash) throws NoSuchAlgorithmException {
    return MessageDigest.getInstance(SHA_256_HASH).digest(valueToHash);
}

谢谢你的帮助


Tags: thekeyimportforreturnparampasswordbyte
2条回答

我刚刚添加了进一步的身份验证步骤,因此该代码完全覆盖了身份验证过程。Im在逆变器上使用SW版本01.13.04122和API版本0.2.0。请检查您的版本。你知道吗

import sys
import random
import string
import base64
import json
import requests
import hashlib
import os
import hmac
from Crypto.Cipher import AES
import binascii
# pip install pycryptodome

USER_TYPE = "user"
PASSWD = 'yourSecretPassword'
BASE_URL = "http://xxx.xxx.xxx.xxx/api/v1"
AUTH_START = "/auth/start"
AUTH_FINISH = "/auth/finish"
AUTH_CREATE_SESSION = "/auth/create_session"
ME = "/auth/me"

def randomString(stringLength):
    letters = string.ascii_letters
    return ''.join(random.choice(letters) for i in range(stringLength))

u = randomString(12)
u = base64.b64encode(u.encode('utf-8')).decode('utf-8')

step1 = {
  "username": USER_TYPE,
  "nonce": u
}
step1 = json.dumps(step1)

url = BASE_URL + AUTH_START
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response = requests.post(url, data=step1, headers=headers)
response = json.loads(response.text)
i = response['nonce']
e = response['transactionId']
o = response['rounds']
a = response['salt']
bitSalt = base64.b64decode(a)

def getPBKDF2Hash(password, bytedSalt, rounds):
    return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), bytedSalt, rounds)

r = getPBKDF2Hash(PASSWD,bitSalt,o)
s = hmac.new(r, "Client Key".encode('utf-8'), hashlib.sha256).digest()
c = hmac.new(r, "Server Key".encode('utf-8'), hashlib.sha256).digest()
_ = hashlib.sha256(s).digest()
d = "n=user,r="+u+",r="+i+",s="+a+",i="+str(o)+",c=biws,r="+i
g = hmac.new(_, d.encode('utf-8'), hashlib.sha256).digest()
p = hmac.new(c, d.encode('utf-8'), hashlib.sha256).digest()
f = bytes(a ^ b for (a, b) in zip(s, g))
proof = base64.b64encode(f).decode('utf-8')

step2 = {
  "transactionId": e,
  "proof": proof
}
step2 = json.dumps(step2)

url = BASE_URL + AUTH_FINISH
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response = requests.post(url, data=step2, headers=headers)
response = json.loads(response.text)
token = response['token']
signature = response['signature']

y = hmac.new(_, "Session Key".encode('utf-8'), hashlib.sha256)
y.update(d.encode('utf-8'))
y.update(s)
P = y.digest()
protocol_key = P
t = os.urandom(16)

e2 = AES.new(protocol_key,AES.MODE_GCM,t)
e2, authtag = e2.encrypt_and_digest(token.encode('utf-8'))

step3 = {
  "transactionId": e,
  "iv": base64.b64encode(t).decode('utf-8'),
  "tag": base64.b64encode(authtag).decode("utf-8"),
  "payload": base64.b64encode(e2).decode('utf-8')
}
step3 = json.dumps(step3)

headers = { 'Content-type': 'application/json', 'Accept': 'application/json' }
url = BASE_URL + AUTH_CREATE_SESSION
response = requests.post(url, data=step3, headers=headers)
response = json.loads(response.text)
sessionId = response['sessionId']

#create a new header with the new Session-ID for all further requests
headers = { 'Content-type': 'application/json', 'Accept': 'application/json', 'authorization': "Session " + sessionId }
url = BASE_URL + ME
response = requests.get(url = url, headers = headers)
response = json.loads(response.text)
authOK = response['authenticated']
if not authOK:
    print("authorization NOT OK")
    sys.exit()

url = BASE_URL + "/info/version"
response = requests.get(url = url, headers = headers)
response = json.loads(response.text)
swversion = response['sw_version']
apiversion = response['api_version']
hostname = response['hostname']
name = response['name']
print("Connected to the inverter " + name + "/" + hostname + " with SW-Version " + swversion + " and API-Version " + apiversion)

# Auth OK, now send your desired requests

我刚刚编写了一些python代码来连接到逆变器。工作到目前为止,您可以看到如何处理异或操作。 现在/auth/start和/auth/finish已经完成。下一步是/auth/create\会话,它还需要一些加密操作。如果你能在这个岗位上工作的话,你会很高兴的。你知道吗

import random
import string
import base64
import json
import requests
import hashlib
import os
import hmac

USER_TYPE = "user"
PASSWD = 'yourSecretPassword'
BASE_URL = "http://xxx.xxx.xxx.xxx/api/v1"
AUTH_START = "/auth/start"
AUTH_FINISH = "/auth/finish"
AUTH_CREATE_SESSION = "/auth/create_session"

def randomString(stringLength):
    letters = string.ascii_letters
    return ''.join(random.choice(letters) for i in range(stringLength))

u = randomString(12)
u = base64.b64encode(u.encode('utf-8')).decode('utf-8')

step1 = {
  "username": USER_TYPE,
  "nonce": u
}
step1 = json.dumps(step1)

url = BASE_URL + AUTH_START
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response = requests.post(url, data=step1, headers=headers)
response = json.loads(response.text)
i = response['nonce']
e = response['transactionId']
o = response['rounds']
a = response['salt']
bitSalt = base64.b64decode(a)

def getPBKDF2Hash(password, bytedSalt, rounds):
    return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), bytedSalt, rounds)

r = getPBKDF2Hash(PASSWD,bitSalt,o)
s = hmac.new(r, "Client Key".encode('utf-8'), hashlib.sha256).digest()
c = hmac.new(r, "Server Key".encode('utf-8'), hashlib.sha256).digest()
_ = hashlib.sha256(s).digest()
d = "n=user,r="+u+",r="+i+",s="+a+",i="+str(o)+",c=biws,r="+i
g = hmac.new(_, d.encode('utf-8'), hashlib.sha256).digest()
p = hmac.new(c, d.encode('utf-8'), hashlib.sha256).digest()
f = bytes(a ^ b for (a, b) in zip(s, g))
proof = base64.b64encode(f).decode('utf-8')

step2 = {
  "transactionId": e,
  "proof": proof
}
step2 = json.dumps(step2)

url = BASE_URL + AUTH_FINISH
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response = requests.post(url, data=step2, headers=headers)
response = json.loads(response.text)
token = response['token']
signature = response['signature']

# TODO more encryption stuff 

相关问题 更多 >