PostgreSQL中锁定和更新选项的一致性

3 投票
3 回答
10327 浏览
提问于 2025-04-16 20:23

我有一个应用程序,它可以支持一定数量的同时操作。这些操作通过一个在Postgres数据库中的“插槽”表来表示。当节点上线时,它们会在表中插入一些行,每个插槽对应一行。当任务占用这些插槽时,它们会更新表中的一行,标记占用一个插槽,并在完成后释放它。

插槽表的结构如下:

CREATE TABLE slots (
    id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
    node_name TEXT NOT NULL,
    job_name TEXT
);

这个表在任何时候都有一些固定数量的行,每一行可能会有一个任务名称,也可能没有。

当一个新任务想要启动时,它会运行一些查询来获取应该在哪个节点上运行的名称:

BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    LIMIT 1
    FOR UPDATE;

(节点名称和ID是从游标中读取的)

UPDATE slots
    SET job_name = %(job_name)s
    WHERE id = %(slot_id)s;
COMMIT;

通常情况下,它能够在不丢失任何更新的情况下占用行,但在并发量较高时,只有少数几行会被占用,而许多SELECT ... FOR UPDATE和UPDATE查询已经执行。最终的结果是,我们运行的任务数量远远超过了可用的插槽数量。

我是不是犯了锁定错误?有没有更好的方法来处理这个问题?有没有什么方法是不使用表锁的?

事务级别的SERIALIZABLE并不能解决问题,只有少数几行会被填充。

我使用的是PostgreSQL 8.4版本。

3 个回答

1

你可能想了解一下建议锁锁的相关内容

虽然我没有测试过,但你可以试着把你的锁定查询改成这样:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock('slots'::regclass::int, id::int)
    LIMIT 1;

或者,因为你一开始就用了bigint(你真的需要那么多ID吗?!),可以试试这样:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock(hashtext('slots_' || id))
    LIMIT 1;

如果你这样做,要小心一些细节——建议锁需要在每个会话中明确解锁,不管事务是否成功。

另外,如果使用hashtext(),可能会有冲突的风险,但如果你是在处理任务,这对你来说应该没什么大问题……

4
BEGIN; 
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; 
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;

这段话的意思是,在“已提交读”模式下,这个方法是有效的。它只涉及SQL(和你的代码一样),而且可以一次性执行(这样更快)。

@Seth Robertson:如果没有锁定表和循环,这样做是不安全的。

假设同时有两个事务A和B:A会先选择第一行,B也会选择第一行。A会锁定并更新这一行,而B必须等到A完成提交后才能继续。然后B会重新检查条件job_name是否为NULL。如果条件不成立(也就是不是NULL),B就不会更新,也不会选择下一行,而是只会重新检查并返回空结果。

@joegester:使用“选择并更新”并不是问题,因为整个表都被锁定了。

或许还有其他方法可以完成这个任务,比如删除并插入行(可能是在另一个表中?)而不是将值设置为NULL。不过我不太确定具体怎么做。

2

我写了一个perl程序来模拟发生的事情,因为我觉得你说的情况不太可能。实际上,在我运行我的模拟后,即使我关闭了锁定功能,也没有遇到任何问题(因为SELECT … FOR UPDATEUPDATE应该会自动处理必要的锁定)。

我在PG 8.3和PG 9.0上都运行了这个程序,结果在这两个版本上都运行得很好。

我建议你试试这个程序,或者尝试一个python版本,这样你就可以有一个很好的测试案例,可以和大家分享。如果它能正常工作,你可以研究一下有什么不同;如果不行,你也有东西可以让其他人来试试。

#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };

sub worker($)
{
  my ($i) = @_;
  my ($job);

  my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

  my ($x) = 0;
  while(++$x)
  {
#    $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
    my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");

    if ($#id < 0)
    {
      $dbh->rollback;
      sleep(.5);
      next;
    }
    $job = "$$-$i-($x)";
    $dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
    $dbh->commit || die "Cannot commit\n";
    last;
  }
  if (!$job)
  {
    print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
    return;
  }
  else
  {
    print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
  }
  sleep(rand(5));

#  $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
  $dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
  print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
  $dbh->commit || die "Cannot commit";
}

my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;

for(my $i=0;$i<200;$i++)
{
  if (!fork)
  {
    worker($i);
    exit(0);
  }

  if (++$numchild > 50)
  {
    sleep(1);
  }
}
while (wait > 0)
{
  $numchild--;
  print "Waiting numchild $numchild\n";
  sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
  printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
  $sum += $slot->[2];
}
print "Successfully made $sum entries\n";

撰写回答