借鉴 30-seconds-of-code 系列,将RSocket使用到的代码场景整理一下,方便大家参考。RSocket的API相对来说比较简单,而且Spring Framework和Spring Boot都有对其封装,所以API已经非常精简。
Composite Metadata
构建Composite Metadata
Composite Metadata是RSocket Payload的metadata数据格式,方便我们为Payload提供各种元数据信息。构建Composite Data非常简单,代码如下:
public static CompositeByteBuf buildConnectionSetupMetadata(final String clientId) {
CompositeByteBuf metadataByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
// Adding the clientId to the composite metadata
CompositeMetadataFlyweight.encodeAndAddMetadata(
metadataByteBuf,
ByteBufAllocator.DEFAULT,
"messaging/x.clientId",
ByteBufAllocator.DEFAULT.buffer().writeBytes(clientId.getBytes()));
return metadataByteBuf;
}
...
CompositeByteBuf metadataByteBuf = buildConnectionSetupMetadata(clientId);
RSocket rSocket = RSocketFactory.connect()
.setupPayload(DefaultPayload.create(Unpooled.EMPTY_BUFFER, metadataByteBuf))
.transport(TcpClientTransport.create(7000))
.start()
.block();
当然解析也非常简单,如下:
private static Map<String, Object> parseMetadata(Payload payload) {
Map<String, Object> metadataMap = new HashMap<>();
CompositeMetadata compositeMetadata = new CompositeMetadata(payload.metadata(), true);
compositeMetadata.forEach(entry -> {
byte[] bytes = new byte[entry.getContent().readableBytes()];
entry.getContent().readBytes(bytes);
metadataMap.put(entry.getMimeType(), new String(bytes, StandardCharsets.UTF_8));
});
return metadataMap;
}
Request/Response
Request/Response是典型的RPC调用,发送请求然后等待返回。在RSocket中,这个等待是异步的,而不是同步的,不会浪费你宝贵的线程资源。
Request请求
// Sending the request
rSocket.requestResponse(DefaultPayload.create(name))
.map(Payload::getDataUtf8)
.subscribe(msg -> {
// Handling the response
LOG.info("Response: {}", msg);
});
Response响应
public Mono<Payload> requestResponse(Payload payload) {
String name = payload.getDataUtf8();
if (name == null || name.isEmpty()) {
name = "You";
}
return Mono.just(DefaultPayload.create(String.format("Hello, %s!", name)));
}
Fire-and-Forget
Fire-and-Forget的使用场景也非常多,如会员注册过程中,发送短信验证码;注册后发送一封欢迎邮件等等,如果你使用堵塞的方式,用户要等待非常时间,体验非常不好,现在只需要Fire-and-Forget后,马上就可以返回。
// Sending the request
rSocket.fireAndForget(DefaultPayload.create(name))
.map(Payload::getDataUtf8)
.subscribe(msg -> {
// Handling the response
LOG.info("Response: {}", msg);
});
Request/Stream
Subscribe a stream
Request/Stream可以像普通订阅一样
rSocket.requestStream(DefaultPayload.create(Unpooled.EMPTY_BUFFER))
.doOnComplete(() -> {
LOG.info("Done");
})
.subscribe(payload -> {
byte[] bytes = new byte[payload.data().readableBytes()];
payload.data().readBytes(bytes);
LOG.info("Received: {}", new BigInteger(bytes).intValue());
});
RSocket中的设计模式
RSocket Responder创建模式
RSocket Responder是指接收并处理通讯的对方发过来的请求,这里我们会使用一个工厂模式负责创建对应Responder Handler,结构图如下:
SimpleResponderFactory会创建一个"SocketAcceptor responder()“方法,主要是为RSocket便捷操作,同时包含一些验证操作,而核心createResponder()负责创建具体的Responder Handler,如SimpleResponderImpl类。 SimpleResponderImpl类不像Servlet那样,是Singleton的。 RSocket中Responder Handler通常是每一个连接对应一个Responder对象,类似于Session的机制,所以Responder Handler中的实力变量是针对连接的, 而其中的requester就是通讯的对方,这样对等通讯完全没有问题。
详细的代码可以参考: https://github.com/linux-china/rsocket-simple-demo
RSocket Session设计
Session是指调用方的一个长连接生命存续期间的数据,如HTTP的Session是基于Cookie的机制实现的,所以我们可以非常简单地获取HttpSession对象,在多个请求之间共享一些数据。 那么在RSocket中如何设计Session机制呢? Reactor框架中有一个Reactor Context的概念,你可以在 https://projectreactor.io/docs/core/release/reference/#context 这里找到。
- 由于Reactor Context默认是不可变的,当你对context进行put操作会生成新的Context对象,所以我们要进行一些调整,创建一个默认可变的Context,如下代码:
public class MutableContext implements Context {
HashMap<Object, Object> holder = new HashMap<>();
...
}
- 接下来我们一个RSocketInterceptor,对RSocket进行拦截处理。 在拦截代码中,我们会调用subscriberContext() 添加context支持,代码如下:
public class RSocketSessionInterceptor implements RSocketInterceptor {
@Override
public RSocket apply(RSocket source) {
return new AbstractRSocket() {
private MutableContext mutableContext = new MutableContext();
@Override
public Mono<Payload> requestResponse(Payload payload) {
return source.requestResponse(payload).subscriberContext(mutableContext::putAll);
}
};
- 添加responder plugin,调用RSocketSessionInterceptor的context处理逻辑,代码如下:
RSocketFactory.receive()
.addResponderPlugin(new RSocketSessionInterceptor())
.acceptor(responderFactory.responder())
- 最后调用deferWithContext获取context,并访问session数据,代码如下:
public Mono<Payload> requestResponse(Payload payload) {
return Mono.deferWithContext((context -> {
//use context to get and set session data
});
其他
获取RSocket Requester的IP地址
在某些场景下,我们想在RSocket鉴权的时候,验证对方的IP地址,如是否在IP白名单内,从而保证一定的安全性。 在RSocket Java 1.1.0版本, DuplexConnection提供了remoteAddress()方法,你只需要通过鉴权时的requester对象的connection字段就可以就可以拿到连接方的IP地址。 下述代码中的FieldUtils类来自Apache commons-lang3。 样例如下:
public Mono<RSocket> createResponder(ConnectionSetupPayload setupPayload, RSocket requester) {
//CompositeMetadata compositeMetadata = new CompositeMetadata(setupPayload.metadata(), false);
//security authentication
try {
DuplexConnection connection = (DuplexConnection) FieldUtils.readField(requester, "connection", true);
InetSocketAddress remoteAddress = (InetSocketAddress) connection.remoteAddress();
} catch (Exception ignore) {
}
SimpleResponderImpl handler = new SimpleResponderImpl(setupPayload, requester);
return Mono.just(handler);
}
当然你也可以通过RSocket的plugin机制,来验证连接方的远程IP地址,如下:
RSocketServer.create()
.acceptor(responderFactory.responder())
.interceptors(registry -> {
registry.forConnection((type, duplexConnection) -> {
//check remote address
return duplexConnection;
});
})
异常日志处理
RSocket Java SDK中默认的异常处理是调用异常的printStackTrace()方法,如果你要调整异常的记录方式,可以调用errorConsumer进行调整,代码如如下:
RSocketFactory.receive()
.errorConsumer(error -> {
// logging
})
Metrics支持
如果你要监测RSocket请求的响应时间等Metrics参数,也非常简单,只要设置一下name,然后调用metrics()方法即可,当然你需要将MicroMeter依赖添加到项目中。 如监控RSocket的requestResponse的响应时间,代码如下:
return rsocket.requestResponse(payload)
.name("com.foobar.UserService.findUserById")
.metrics()
RSocket连接层拦截
如果想做一些连接层的拦截,也就是字节流发送到网络之前,你可以使用DuplexConnectionInterceptor。在这一层你可以进行一些网络扩展,如流控等,代码如下:
public class TokenBucketInterceptor implements DuplexConnectionInterceptor {
@Override
public DuplexConnection apply(Type type, DuplexConnection source) {
if (type.equals(Type.CLIENT)) {
return new DuplexConnectionProxy(source) {
@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
//token bucket control
return super.send(frames);
}
};
}
return source;
}
}
References
- RSocket Java SDK: https://github.com/rsocket/rsocket-java
- Spring RSocket: https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/web-reactive.html#rsocket