如何在python脚本中添加sasl.PLAIN(API)和GSSAPI(Kerberos)身份验证的配置设置

2024-04-28 07:35:55 发布

您现在位置:Python中文网/ 问答频道 /正文

需要一些帮助来设置sasl.mechanisplain(API)和GSSAPI(Kerberos)身份验证的配置

这里我们使用的是合流卡夫卡,有两个脚本,一个是python脚本,第二个是bash脚本,它调用python脚本。您可以在下面找到脚本

提前谢谢你的帮助

import json
import os
import string
import random
import socket
import uuid
import re
from datetime import datetime
import time
import hashlib
import math
import sys
from functools import cache
from confluent_kafka import Producer, KafkaError, KafkaException

topic_name = os.environ['TOPIC_NAME']
partition_count = int(os.environ['PARTITION_COUNT'])
message_key_template = json.loads(os.environ['KEY_TEMPLATE'])
message_value_template = json.loads(os.environ['VALUE_TEMPLATE'])
message_header_template = json.loads(os.environ['HEADER_TEMPLATE'])
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
perf_counter_batch_size = int(os.environ.get('PERF_COUNTER_BATCH_SIZE', 100))
messages_per_aggregate = int(os.environ.get('MESSAGES_PER_AGGREGATE', 1))
max_message_count = int(os.environ.get('MAX_MESSAGE_COUNT', sys.maxsize))

def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

def acked(err, msg):
    if err is not None:
        print("Failed to send message: %s: %s" % (str(msg), str(err)))


producer_configs = {
    'bootstrap.servers': bootstrap_servers, 
    'client.id': socket.gethostname(),
    'error_cb': error_cb
}
# TODO: Need to support sasl.mechanism PLAIN (API) and GSSAPI (Kerberos) authentication.
# TODO: Need to support truststores for connecting to private DCs.

producer = Producer(producer_configs)


# generates a random value if it is not cached in the template_values dictionary
def get_templated_value(term, template_values):
    if not term in template_values:
        template_values[term] = str(uuid.uuid4())
    return template_values[term]

def fill_template_value(value, template_values):
    str_value = str(value)
    template_regex = '{{(.+?)}}'
    templated_terms = re.findall(template_regex, str_value)
    for term in templated_terms:
        str_value = str_value.replace(f"{{{{{term}}}}}", get_templated_value(term, template_values))
    return str_value

def fill_template(template, templated_terms):
    # TODO: Need to address metadata field, as it's treated as a string instead of a nested object.
    return  {field: fill_template_value(value, templated_terms) for field, value in template.items()}

@cache
def get_partition(lock_id):
    bits = 128
    bucket_size = 2**bits / partition_count
    
    partition = (int(hashlib.md5(lock_id.encode('utf-8')).hexdigest(), 16) / bucket_size)
    return math.floor(partition)

sequence_number = int(time.time() * 1000)
sequence_number = 0
message_count = 0
producing = True
start_time = time.perf_counter()
aggregate_message_counter = 0

# cache for templated term values so that they match across the different templates
templated_values = {}
try:
    while producing:
        sequence_number += 1
        aggregate_message_counter += 1
        message_count += 1

        if aggregate_message_counter % messages_per_aggregate == 0:
            # reset templated values
            templated_values = {}
        else:
            for term in list(templated_values):
                if term not in ['aggregateId', 'tenantId']:
                    del(templated_values[term])

        # Fill in templated field values
        message_key = fill_template(message_key_template, templated_values)
        message_value = fill_template(message_value_template, templated_values)
        message_header = fill_template(message_header_template, templated_values)

        ts = datetime.utcnow().isoformat()[:-3]+'Z'

        message_header['timestamp'] = ts
        message_header['sequence_number'] = str(sequence_number)
        message_value['timestamp'] = ts
        message_value['sequenceNumber'] = sequence_number

        lock_id = message_header['lock_id']
        partition = get_partition(lock_id)  # partition by lock_id, since key could be random, but a given aggregate_id should ALWAYS resolve to the same partition, regardless of key.

        # Send message
        producer.produce(topic_name, partition=partition, key=json.dumps(message_key), value=json.dumps(message_value), headers=message_header, callback=acked)

        if sequence_number % perf_counter_batch_size == 0:
            producer.flush()
            end_time = time.perf_counter()
            total_duration = end_time - start_time

            messages_per_second=(perf_counter_batch_size/total_duration)
            print(f'{messages_per_second} messages/second')

            # reset start time
            start_time = time.perf_counter()
        if message_count >= max_message_count:
            break
except Exception as e:
    print(f'ERROR: %s' % e)
    sys.exit(1)
finally:
    producer.flush()

Tags: toimportmessageiftimevalueoscounter