Python通过套接字加密数据

2024-06-16 11:04:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在开发一个Python应用程序,分为两个脚本,每个脚本将放在一个Docker容器中。你知道吗

其中一个脚本模拟automate的工作,创建随机生成的数据,我将其存储在dict对象中,然后通过python socket库将其发送给另一个脚本。你知道吗

我发送的数据如下所示:

{
  "unit": 1,
  "timestamp": 1561566549688,
  "params": {
    "0": {
      "temp_tank": 3.2,
      "temp_ext": 11.7,
      "tank_weight": 3686,
      "finished_product_weight": 406,
      "pH_rate": 7.2,
      "potassium_rate": 36,
      "NaCL_concentration": 1.7,
      "salmonella_level": 18,
      "e-coli_level": 40,
      "listeria_level": 30
    },
    "1": {
      "temp_tank": 3.6,
      "temp_ext": 10.2,
      "tank_weight": 4597,
      "finished_product_weight": -219,
      "pH_rate": 7.0,
      "potassium_rate": 44,
      "NaCL_concentration": 1.4,
      "salmonella_level": 29,
      "e-coli_level": 41,
      "listeria_level": 39
    },
    "2": {
      "temp_tank": 3.6,
      "temp_ext": 11.7,
      "tank_weight": 4587,
      "finished_product_weight": -938,
      "pH_rate": 6.9,
      "potassium_rate": 39,
      "NaCL_concentration": 1.1,
      "salmonella_level": 35,
      "e-coli_level": 42,
      "listeria_level": 50
    },
    "3": {
      "temp_tank": 3.3,
      "temp_ext": 10.3,
      "tank_weight": 4120,
      "finished_product_weight": -38,
      "pH_rate": 7.2,
      "potassium_rate": 41,
      "NaCL_concentration": 1.7,
      "salmonella_level": 32,
      "e-coli_level": 38,
      "listeria_level": 41
    },
    "4": {
      "temp_tank": 2.5,
      "temp_ext": 12.5,
      "tank_weight": 3723,
      "finished_product_weight": -134,
      "pH_rate": 7.1,
      "potassium_rate": 43,
      "NaCL_concentration": 1.3,
      "salmonella_level": 22,
      "e-coli_level": 39,
      "listeria_level": 30
    },
    "5": {
      "temp_tank": 3.2,
      "temp_ext": 9.5,
      "tank_weight": 4236,
      "finished_product_weight": 116,
      "pH_rate": 7.2,
      "potassium_rate": 41,
      "NaCL_concentration": 1.4,
      "salmonella_level": 19,
      "e-coli_level": 39,
      "listeria_level": 44
    },
    "6": {
      "temp_tank": 2.8,
      "temp_ext": 11.6,
      "tank_weight": 4279,
      "finished_product_weight": -616,
      "pH_rate": 6.9,
      "potassium_rate": 46,
      "NaCL_concentration": 1.2,
      "salmonella_level": 29,
      "e-coli_level": 47,
      "listeria_level": 47
    },
    "7": {
      "temp_tank": 2.8,
      "temp_ext": 13.0,
      "tank_weight": 3774,
      "finished_product_weight": 290,
      "pH_rate": 7.1,
      "potassium_rate": 38,
      "NaCL_concentration": 1.6,
      "salmonella_level": 20,
      "e-coli_level": 41,
      "listeria_level": 39
    },
    "8": {
      "temp_tank": 3.6,
      "temp_ext": 13.0,
      "tank_weight": 3978,
      "finished_product_weight": 381,
      "pH_rate": 6.8,
      "potassium_rate": 45,
      "NaCL_concentration": 1.4,
      "salmonella_level": 35,
      "e-coli_level": 37,
      "listeria_level": 49
    },
    "9": {
      "temp_tank": 2.9,
      "temp_ext": 10.2,
      "tank_weight": 4109,
      "finished_product_weight": -174,
      "pH_rate": 7.1,
      "potassium_rate": 46,
      "NaCL_concentration": 1.5,
      "salmonella_level": 24,
      "e-coli_level": 36,
      "listeria_level": 45
    }
  }
}

第二个脚本使用相同的socket库检索该数据,然后将其推入远程数据库。你知道吗

这部分工作没有任何问题,我可以很容易地将数据字典的json格式的字符串从生成脚本发送到另一个脚本,带有套接字。你知道吗

我现在的问题是,我想通过对json格式的字符串进行非对称加密和解密来保护套接字连接。你知道吗

为此,我使用cryptographypython库并使用RSA加密。你知道吗

当我启动脚本时,当到达“socketsend”行时,两个脚本都崩溃了。第一个输出此错误:

cryptography.exceptions.InternalError: Unknown OpenSSL error. This error is commonly encountered when another library is not cleaning up the OpenSSL error stack. If you are using cryptography with another library that us es OpenSSL try disabling it before reporting a bug. Otherwise please file an issue at https://github.com/pyca/cryptography/issues with information on how to reproduce this. ([])

第二个输出:

ValueError: Ciphertext length must be equal to key size.

我似乎没有任何其他运行库使用OpenSSL。你知道吗

以下是我的两个脚本:

生成.py

##
#   Generate file containing data from automates
##

import sys
import random
import json
import glob
import os
import time
import socket
import threading

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding


# Generate the private and public keys once

# from cryptography.hazmat.primitives.asymmetric import rsa
# private_key = rsa.generate_private_key(
#     public_exponent=65537,
#     key_size=20480,
#     backend=default_backend()
# )
# public_key = private_key.public_key()
#
# pem = private_key.private_bytes(
#     encoding=serialization.Encoding.PEM,
#     format=serialization.PrivateFormat.PKCS8,
#     encryption_algorithm=serialization.NoEncryption()
# )
#
# with open('private.pem', 'wb') as f:
#     f.write(pem)
#
# pem = public_key.public_bytes(
#     encoding=serialization.Encoding.PEM,
#     format=serialization.PublicFormat.SubjectPublicKeyInfo
# )
#
# with open('public.pem', 'wb') as f:
#     f.write(pem)

with open("public.pem", "rb") as key_file:
    public_key = serialization.load_pem_public_key(
        key_file.read(),
        backend=default_backend()
    )


# Handle argument related errors, we only want 2! No more no less.
if len(sys.argv) > 3:
    print("Please give max 2 arguments.")
    exit()
elif len(sys.argv) < 2:
    print("Please give the amount of time for the update cycle (in seconds), and the unit number.")
    exit()
else:
    print("Initializing...")

# Registering the passed arguments
cycle_value = float(sys.argv[1])
unit_number = int(sys.argv[2])


def encrypt_data(data):
    return bytes(base64.b64encode(public_key.encrypt(
        data,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )), 'utf-8')


# Get the tank weight of the previous data set
def get_last_tank_weight(id):
    if not os.path.isdir('generated-data'):
        os.mkdir('generated-data')
    if len(glob.glob('generated-data/*.json')) == 0:
        return 0
    else:
        list_of_files = glob.glob('generated-data/*.json')
        latest_file = max(list_of_files, key=os.path.getctime)

        return int(json.loads(open(latest_file, 'r').read())["params"][str(id - 1)]["tank_weight"])


# Generate random value for each automate
def generate_values(id):
    tank_weight = int(round(random.uniform(3512, 4607), 0))

    params = {
        "temp_tank": round(random.uniform(2.5, 4), 1),
        "temp_ext": round(random.uniform(8, 14), 1),
        "tank_weight": tank_weight,
        "finished_product_weight": get_last_tank_weight(id) - tank_weight,
        "pH_rate": round(random.uniform(6.8, 7.2), 1),
        "potassium_rate": int(round(random.uniform(35, 47), 0)),
        "NaCL_concentration": round(random.uniform(1.0, 1.7), 1),
        "salmonella_level": int(round(random.uniform(17, 37), 0)),
        "e-coli_level": int(round(random.uniform(35, 49), 0)),
        "listeria_level": int(round(random.uniform(28, 54), 0))
    }

    return params


# Create and fill the json data file for the unit
def fill_json():
    timestamp = int(time.time() * 1000)

    json_output = {
        "unit": unit_number,
        "timestamp": timestamp,
        "params": {}
    }

    params = {}
    f_automates = open("automates.json", "r")
    json_automates = json.loads(f_automates.read())[str(unit_number)]

    print("Generating values...")
    for automate in json_automates:
        params[int(automate["num"]) - 1] = generate_values(automate["num"])

    json_output["params"] = params

    filename = "paramunite_" + str(unit_number) + "_" + str(timestamp) + ".json"

    with open("generated-data/" + filename, "w+") as json_file:
        json.dump(json_output, json_file)

    print("file " + filename + " successfully created.")

    print(encrypt_data(bytes(json.dumps(json_output), 'utf-8')))

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((socket.gethostname(), 25565))
        # s.connect(('pyconcentrator', 25565))

        s.sendall(encrypt_data(bytes(json.dumps(json_output), 'utf-8')))

    # Line to make script wait for new creation
    threading.Timer(cycle_value, fill_json).start()


# Launch main script!
fill_json()

集中器.py

import socket
import json
import base64
import mysql.connector as mariadb

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding


host = socket.gethostname()
port = 25565

s = socket.socket()
s.bind((host, port))
s.listen(60)

with open("private.pem", "rb") as key_file:
    private_key = serialization.load_pem_private_key(
        key_file.read(),
        password=None,
        backend=default_backend()
    )


def decrypt_data(data):
    return private_key.decrypt(
        data,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )


# Connect to MariaDB
def connect_to_db():
    return mariadb.connect(
        host='mariadbaubeurre', port=3306, user='root', password='beurette', database='beurre'
        # host='127.0.0.1', port=3307, user='root', password='', database='test'
    )


# Get the type of the automate given its id, by searching in the automates.json file
def get_automate_type(id, unit):
    return json.loads(open('automates.json', 'r').read())[str(unit)][int(id)-1]["type"]


def send_to_db(data):
    mariadb_connection = connect_to_db()
    cursor = mariadb_connection.cursor()

    i = 1
    # Prepare INSERT SQL query
    sql = "INSERT INTO measures (`measure_datetime`, `measure_temp_cuve`, `measure_temp_ext`, `measure_weight_milk`, `measure_weight_product`, `measure_ph`, `measure_k_plus`, `measure_nacl`, `measure_bacteria_salmonella`, `measure_bacteria_ecoli`, `measure_bacteria_listeria`, `measure_fk_automaton`) VALUES "
    for y in data["params"]:
        sql += "({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}),".format(
            data["timestamp"],
            data["params"][y]["temp_tank"],
            data["params"][y]["temp_ext"],
            data["params"][y]["tank_weight"],
            data["params"][y]["finished_product_weight"],
            data["params"][y]["pH_rate"],
            data["params"][y]["potassium_rate"],
            data["params"][y]["NaCL_concentration"],
            data["params"][y]["salmonella_level"],
            data["params"][y]["e-coli_level"],
            data["params"][y]["listeria_level"],
            "(SELECT automaton_id FROM automatons WHERE automaton_fk_unit = (SELECT unit_id FROM `units` WHERE unit_fk_site = 1 AND unit_num = {}) AND automaton_type LIKE '{}' AND automaton_num = {})".format(
                data["unit"], get_automate_type(i, data["unit"]), i)
        )
        i += 1

    cursor.execute(sql[:-1] + ";")
    mariadb_connection.commit()
    mariadb_connection.close()


def wait_for_data():
    output = ""

    print("Waiting for connection...")

    conn, addr = s.accept()
    print("Receiving from ", addr, "...")
    while True:
        data = conn.recv(1024)
        output += decrypt_data(data).decode('utf-8')
        if not data:
            break

    print(json.loads(output))
    send_to_db(json.loads(output))
    wait_for_data()


wait_for_data()

s.close()

我被这件事困住了,找不到任何解决办法。 我是一个密码学的呆子,所以也许我没有使用正确的加密方法或类似的东西,但我有点迷失了所有的加密的可能性。你知道吗

谢谢你!!你知道吗


Tags: keyimportjsondatarateunitparamsproduct