AWS Lambda(python)中Cassandra数据库会话重用

2024-06-16 09:04:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试重用Cassandra集群会话以用于后续的AWS Lambda函数调用。我已经在Java中成功地实现了它,但是在python中重用会话会导致lambda调用超时(实际执行初始化的第一个调用是可以的)。在

从CloudWatch日志中我可以看到一个Heartbeat failed for connection。在我看来,会话在空闲时无法通信,并且处于无法恢复连接的不一致状态。尝试比函数超时更长或更短的idle_heartbeat_interval实际上对结果没有任何影响。在

以下是lambda函数的结构(为简洁起见,省略了一些代码):

import logging
from cassandra_client import CassandraClient

logger = logging.getLogger()
logger.setLevel(logging.INFO)

#   State of the initialization phase
flag = False

#   Cassandra instance
cassandra = None

def handle_request(event, context):

    global flag, logger, cassandra

    logger.info('Function started. Flag: %s' % (str(flag), ))

    if not flag:
        logger.info('Initialization...')
        try:
            cassandra = CassandraClient()

            #   ...

            flag = True

        except Exception as e:
            logger.error('Cannot perform initialization: '+e.message)
            exit(-1)

    #   Process the request ...
    return 'OK'

为了完整起见,我就是这样创建与集群的连接的:

^{pr2}$

是否有一些驱动程序配置的细节,python lambda行为,我不知道,以防止会话被重用?在

我确实认为awslambda是一个非常好的工具,但是对执行没有太多的控制可能会在某些方面造成混乱。任何建议都非常感谢,谢谢。在


Tags: thelambda函数importinfoawsrequestlogging
2条回答

cassandra-driver常见问题解答中也有类似的问题,其中WSGI应用程序不能与全局连接池一起工作:

Depending on your application process model, it may be forking after driver Session is created. Most IO reactors do not handle this, and problems will manifest as timeouts. [Here][1]

这至少让我正确地检查了可用的连接类:结果发现,cassandra.io.twistedreactor.TwistedConnection在awslambda上工作得很好。在

总的来说,代码是这样的:

from cassandra.cluster import Cluster
from cassandra.io.twistedreactor import TwistedConnection
import time


SESSION = Cluster([...], connection_class=TwistedConnection).connect()


def run(event, context):
    t0 = time.time()
    x = list(SESSION.execute('SELECT * FROM keyspace.table'))  # Ensure query actually evaluated
    print('took', time.time() - t0)

不过,您需要在venv中安装twisted。在

我在一分钟的crontab上运行了一夜,只看到一些连接错误(一小时内最多2个),所以总体上对这个解决方案很满意。在

另外,我还没有测试基于eventletgevent的连接,因为我不能让它们猴子修补我的应用程序,而且我也不想编译libev来在lambda上使用。也许有人想试试。在

别忘了 http://datastax.github.io/python-driver/faq.html#why-do-connections-or-io-operations-timeout-in-my-wsgi-application

我想我可以声明,这个问题是由于使用Python执行环境w.r.t.Java时lambda的不同行为引起的。在

我有时间设置了一个简单的lambda函数,它都是用javaadpython实现的。该函数只生成一个线程,该线程在while循环中打印当前时间。问题是:Java实现中的线程是否会在lambda函数返回后继续打印,相反,Python线程是否会停止?在这两种情况下,答案都是肯定的:java线程将继续打印直到配置的超时,而python将在lambda函数返回时立即停止。在

Java版本的CloudWatch日志确认:

09:55:21 START RequestId: b70e732b-e476-11e6-b2bb-e11a0dd9b311 Version: $LATEST
09:55:21 Function started: 1485510921351
09:55:21 Pre function call: 1485510921351
09:55:21 Background function: 1485510921352
09:55:21 Background function: 1485510921452
09:55:21 Background function: 1485510921552
09:55:21 Background function: 1485510921652
09:55:21 Background function: 1485510921752
09:55:21 Post function call: 1485510921852
09:55:21 Background function: 1485510921853
09:55:21 END RequestId: b70e732b-e476-11e6-b2bb-e11a0dd9b311
09:55:21 REPORT RequestId: b70e732b-e476-11e6-b2bb-e11a0dd9b311 Duration: 523.74 ms Billed Duration: 600 ms Memory Size: 256 MB Max Memory Used: 31 MB
09:55:21 Background function: 1485510921953
09:55:22 Background function: 1485510922053
...

在Python版本中:

^{pr2}$

以下是两个函数的代码:

Python

import thread
import datetime
import time


def background_call():
    while True:
        print 'background_call function: %s' % (datetime.datetime.now(), )
        time.sleep(0.1)

def lambda_handler(event, context):
    print 'Function started: %s' % (datetime.datetime.now(), )

    print 'Pre function call: %s' % (datetime.datetime.now(), )
    thread.start_new_thread(background_call, ())
    time.sleep(0.5)
    print 'Post function call: %s' % (datetime.datetime.now(), )

    return 'Needs more cowbell!'

Java

import com.amazonaws.services.lambda.runtime.*;


public class BackgroundTest implements RequestHandler<RequestClass, ResponseClass> {

    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
    }

    public ResponseClass handleRequest(RequestClass requestClass, Context context) {
        System.out.println("Function started: "+System.currentTimeMillis());
        System.out.println("Pre function call: "+System.currentTimeMillis());
        Runnable r = new Runnable() {
            public void run() {
                while(true){
                    try {
                        System.out.println("Background function: "+System.currentTimeMillis());
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread t = new Thread(r);
        t.start();
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Post function call: "+System.currentTimeMillis());
        return new ResponseClass("Needs more cowbell!");
    }
}

相关问题 更多 >