rpc demo, 支持普通rpc和流式调用(server-side)
启动provider
Config config =
new Config()
.setRefClasses(Collections.singletonList(UserApi.class))
.setSerializer(new FastJsonSerializer())
.setServerSocketAddr(new InetSocketAddress("127.0.0.1", 8091));
ServerConfig serverConfig = ServerConfig.copy(config);
serverConfig.exportService(UserApi.class, new UserApiImpl());
ServerBootstrap serverBootstrap = new ServerBootstrap(serverConfig);
serverBootstrap.init();
serverBootstrap.start();
启动consumer
Config config =
new Config()
.setRefClasses(Collections.singletonList(UserApi.class))
.setSerializer(new FastJsonSerializer())
.setServerSocketAddr(new InetSocketAddress("127.0.0.1", 8091));
ClientBootstrap clientBootstrap = new ClientBootstrap(config);
clientBootstrap.init();
clientBootstrap.start();
引用&调用测试
UserApi userApi = clientBootstrap.getReference(UserApi.class);
userApi.add(new User("1", "lanicc", new Date()));
User user = userApi.findById("1");
System.out.println(user);
List<User> users = userApi.list();
System.out.println(users);
接口
void iterate(StreamObserver<User> userStreamObserver);
客户端调用
userApi.iterate(new StreamObserver<User>() {
@Override
public void onNext(User user) {
System.out.println("onNext: " + user);
stopWatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
服务端流式回写
@Override
public void iterate(StreamObserver<User> userStreamObserver) {
logger.info("iterate all user: {}", userStreamObserver.getClass());
try {
for (User user : users.values()) {
userStreamObserver.onNext(user);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} finally {
userStreamObserver.onCompleted();
}
}