PostgreSQL中锁定和更新选项的一致性
我有一个应用程序,它可以支持一定数量的同时操作。这些操作通过一个在Postgres数据库中的“插槽”表来表示。当节点上线时,它们会在表中插入一些行,每个插槽对应一行。当任务占用这些插槽时,它们会更新表中的一行,标记占用一个插槽,并在完成后释放它。
插槽表的结构如下:
CREATE TABLE slots (
id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
node_name TEXT NOT NULL,
job_name TEXT
);
这个表在任何时候都有一些固定数量的行,每一行可能会有一个任务名称,也可能没有。
当一个新任务想要启动时,它会运行一些查询来获取应该在哪个节点上运行的名称:
BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
FROM slots
WHERE job_name IS NULL
LIMIT 1
FOR UPDATE;
(节点名称和ID是从游标中读取的)
UPDATE slots
SET job_name = %(job_name)s
WHERE id = %(slot_id)s;
COMMIT;
通常情况下,它能够在不丢失任何更新的情况下占用行,但在并发量较高时,只有少数几行会被占用,而许多SELECT ... FOR UPDATE和UPDATE查询已经执行。最终的结果是,我们运行的任务数量远远超过了可用的插槽数量。
我是不是犯了锁定错误?有没有更好的方法来处理这个问题?有没有什么方法是不使用表锁的?
事务级别的SERIALIZABLE并不能解决问题,只有少数几行会被填充。
我使用的是PostgreSQL 8.4版本。
3 个回答
虽然我没有测试过,但你可以试着把你的锁定查询改成这样:
BEGIN;
SELECT id, node_name
FROM slots
WHERE job_name IS NULL
AND pg_try_advisory_lock('slots'::regclass::int, id::int)
LIMIT 1;
或者,因为你一开始就用了bigint(你真的需要那么多ID吗?!),可以试试这样:
BEGIN;
SELECT id, node_name
FROM slots
WHERE job_name IS NULL
AND pg_try_advisory_lock(hashtext('slots_' || id))
LIMIT 1;
如果你这样做,要小心一些细节——建议锁需要在每个会话中明确解锁,不管事务是否成功。
另外,如果使用hashtext()
,可能会有冲突的风险,但如果你是在处理任务,这对你来说应该没什么大问题……
BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;
这段话的意思是,在“已提交读”模式下,这个方法是有效的。它只涉及SQL(和你的代码一样),而且可以一次性执行(这样更快)。
@Seth Robertson:如果没有锁定表和循环,这样做是不安全的。
假设同时有两个事务A和B:A会先选择第一行,B也会选择第一行。A会锁定并更新这一行,而B必须等到A完成提交后才能继续。然后B会重新检查条件job_name是否为NULL。如果条件不成立(也就是不是NULL),B就不会更新,也不会选择下一行,而是只会重新检查并返回空结果。
@joegester:使用“选择并更新”并不是问题,因为整个表都被锁定了。
或许还有其他方法可以完成这个任务,比如删除并插入行(可能是在另一个表中?)而不是将值设置为NULL。不过我不太确定具体怎么做。
我写了一个perl程序来模拟发生的事情,因为我觉得你说的情况不太可能。实际上,在我运行我的模拟后,即使我关闭了锁定功能,也没有遇到任何问题(因为SELECT … FOR UPDATE
和UPDATE
应该会自动处理必要的锁定)。
我在PG 8.3和PG 9.0上都运行了这个程序,结果在这两个版本上都运行得很好。
我建议你试试这个程序,或者尝试一个python版本,这样你就可以有一个很好的测试案例,可以和大家分享。如果它能正常工作,你可以研究一下有什么不同;如果不行,你也有东西可以让其他人来试试。
#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };
sub worker($)
{
my ($i) = @_;
my ($job);
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my ($x) = 0;
while(++$x)
{
# $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");
if ($#id < 0)
{
$dbh->rollback;
sleep(.5);
next;
}
$job = "$$-$i-($x)";
$dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
$dbh->commit || die "Cannot commit\n";
last;
}
if (!$job)
{
print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
return;
}
else
{
print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
}
sleep(rand(5));
# $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
$dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
$dbh->commit || die "Cannot commit";
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;
for(my $i=0;$i<200;$i++)
{
if (!fork)
{
worker($i);
exit(0);
}
if (++$numchild > 50)
{
sleep(1);
}
}
while (wait > 0)
{
$numchild--;
print "Waiting numchild $numchild\n";
sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
$sum += $slot->[2];
}
print "Successfully made $sum entries\n";