有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

维特斯。io java+twitter4j事件循环集成

我正在尝试将twitter4j与vertx事件循环集成在一起,但我不知道我的方法是否正确。 我是vertx的新手,虽然我是一个节点。js开发者,因此我熟悉事件循环/单线程的概念

在我的测试中,我想订阅一个twitter流,并在vertx事件总线上发布该流

我已经创建了一个不可知的TwitterAPI类,它将把twitter流转换成一个可观察的(稍后将连接到vertx):

public class TwitterAPI {
    public Observable<Status> getTwitterObservable() {
        return Observable.create(emitter -> {
            final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener(){
                public void onStatus(Status status) {
                    emitter.onNext(status);
                }
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
                public void onException(Exception ex) {
                    emitter.onError(ex);
                }
                @Override
                public void onScrubGeo(long userId, long upToStatusId) {}

                @Override
                public void onStallWarning(StallWarning warning) {}
            });

            //twitterStream.filter("some keyword");
            twitterStream.sample();
        });
    }
}

然后,我创建了一个TwitterVerticle,它将侦听上述可观察内容并在事件总线上发布流,以便其他一些Verticle可以订阅并处理它:

public class TwitterVerticle extends AbstractVerticle {
    public void start() {
        EventBus eb = this.vertx.eventBus();
        TwitterAPI twitterAPI = new TwitterAPI();
        twitterAPI.getTwitterObservable()
            .map(Status::getText)
            .filter(text -> text.startsWith("my keyword"))
            .subscribe(text -> {
                eb.publish("tweet-feed", text);
            });
    }
}

例如,我创建了另一个verticle,它将在事件总线上收听“twitter提要”,并将其发布到WebSocket上,这样您就可以在浏览器中看到提要

乍一看,一切都很顺利。。。但是我的主要问题是:我不确定twitter4j是否能很好地处理事件循环,也许我的集成技术是错误的。也许我应该让TwitterVerticle成为工人Verticle

有人能看一看,让我知道这是完成这项任务的最好方法吗

非常感谢

编辑

在这种情况下,直接在事件总线上发布是更好的模式吗? 以下是修改后的代码:

public class TwitterAPI {

    public void publishStreamOnEventBus(Vertx vertx) {
        EventBus eb = vertx.eventBus();
        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

        twitterStream.addListener(new StatusListener(){
            public void onStatus(Status status) {
                eb.publish("tweet-feed", status.getText());
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onException(Exception ex) {
                //emitter.onError(ex);
            }
            @Override
            public void onScrubGeo(long userId, long upToStatusId) {}

            @Override
            public void onStallWarning(StallWarning warning) {}
        });

        twitterStream.sample();
    }
}



public class Main {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        new TwitterAPI().publishStreamOnEventBus(vertx);

        //vertx.deployVerticle(new TwitterVerticle()/*, new DeploymentOptions().setWorker(true)*/);
        vertx.deployVerticle(new WebServerVerticle());
    }

}

共 (1) 个答案

  1. # 1 楼答案

    首先是一些一般性建议

    1/如果订阅Observable涉及调用阻塞API,请使用blockingScheduler

    Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
    Observable<String> obs = twitterObservable.subscribeOn(blockingScheduler);
    

    2/然后我假设twitter4j使用自己的线程调用StatusListener,因此EventBus#publish调用将在其中一个线程上进行。要返回垂直上下文,请使用#observeOn运算符:

    Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
    Observable<String> obs = twitterObservable.observeOn(contextScheduler);
    

    结合两个变化:

    public class TwitterVerticle extends AbstractVerticle {
        public void start() {
            EventBus eb = this.vertx.eventBus();
            Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
            Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
            TwitterAPI twitterAPI = new TwitterAPI();
            twitterAPI.getTwitterObservable()
                .map(Status::getText)
                .filter(text -> text.startsWith("my keyword"))
                .observeOn(contextScheduler)
                .subscribeOn(blockingScheduler)
                .subscribe(text -> {
                    eb.publish("tweet-feed", text);
                });
        }
    }
    

    综上所述,如果这是verticle的唯一工作,我建议将其删除,并简单地发布到事件总线。事件总线实例是线程安全的,可以从外部(非Vert.x)世界调用publish。事实上,这是一个很好的模式来结合垂直。带有遗留代码的x代码