Skip to content

Commit cf5c21a

Browse files
committed
Create UNIX sockets JsonRpc transport client and server
To run a simple test run UnixSocketEchoServer main() first, then run JsonRpcClientUnixSocket main(). Also tested briefly with c-lightningd (in Jan 2022, before rebase/update)
1 parent d23c250 commit cf5c21a

File tree

6 files changed

+314
-0
lines changed

6 files changed

+314
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
plugins {
2+
id 'java-library'
3+
}
4+
5+
tasks.withType(JavaCompile).configureEach {
6+
options.release = 17
7+
}
8+
9+
dependencies {
10+
api project(':consensusj-jsonrpc')
11+
12+
testImplementation "org.slf4j:slf4j-jdk14:${slf4jVersion}" // Runtime implementation of slf4j
13+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.consensusj.jsonrpc.unix;
2+
3+
import com.fasterxml.jackson.databind.JavaType;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import org.consensusj.jsonrpc.DefaultRpcClient;
7+
import org.consensusj.jsonrpc.JsonRpcMessage;
8+
import org.consensusj.jsonrpc.JsonRpcRequest;
9+
import org.consensusj.jsonrpc.JsonRpcResponse;
10+
import org.consensusj.jsonrpc.JsonRpcStatusException;
11+
import org.consensusj.jsonrpc.JsonRpcTransport;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.io.IOException;
16+
import java.net.StandardProtocolFamily;
17+
import java.net.URI;
18+
import java.net.UnixDomainSocketAddress;
19+
import java.nio.ByteBuffer;
20+
import java.nio.channels.SocketChannel;
21+
import java.nio.file.Path;
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/**
25+
* Proof-of-concept UNIX domain socket JsonRpc Client (works with {@link UnixSocketEchoServer} and {@code lightningd}.)
26+
*/
27+
public class JsonRpcClientUnixSocket implements JsonRpcTransport<JavaType> {
28+
private static final Logger log = LoggerFactory.getLogger(JsonRpcClientUnixSocket.class);
29+
private final UnixDomainSocketAddress socketAddress;
30+
private final JsonRpcUnixSocketMapper socketMapper;
31+
32+
public static void main(String[] args) throws IOException {
33+
boolean useEchoServer = true;
34+
Path socketPath = useEchoServer ? UnixSocketEchoServer.getTestPath() : getLightningRpcPath();
35+
UnixSocketTransportFactory factory = new UnixSocketTransportFactory(socketPath);
36+
try (var client = new DefaultRpcClient(factory, JsonRpcMessage.Version.V2)) {
37+
JsonNode response = client.send("getinfo", JsonNode.class);
38+
System.out.println(response);
39+
}
40+
}
41+
42+
public JsonRpcClientUnixSocket(Path socketPath, ObjectMapper mapper) {
43+
socketAddress = UnixDomainSocketAddress.of(socketPath);
44+
socketMapper = new JsonRpcUnixSocketMapper(mapper);
45+
46+
// TODO: Delete on close:
47+
// Files.deleteIfExists(socketPath);
48+
}
49+
50+
@Override
51+
public <R> JsonRpcResponse<R> sendRequestForResponse(JsonRpcRequest request, JavaType responseType) throws IOException, JsonRpcStatusException {
52+
SocketChannel channel = SocketChannel.open(StandardProtocolFamily.UNIX);
53+
// TODO: Use Selectable channel for async??
54+
channel.connect(socketAddress);
55+
ByteBuffer buffer = socketMapper.serializeRequest(request);
56+
while (buffer.hasRemaining()) {
57+
channel.write(buffer);
58+
}
59+
// TODO: Read response see https://www.baeldung.com/java-unix-domain-socket
60+
// See also: https://nipafx.dev/java-unix-domain-sockets/
61+
// And: https://www.linkedin.com/pulse/java-sockets-io-blocking-non-blocking-asynchronous-aliaksandr-liakh/
62+
JsonRpcResponse<R> responseJson = null;
63+
try {
64+
responseJson = socketMapper.readSocketResponse(request, responseType, channel);
65+
} catch (InterruptedException e) {
66+
e.printStackTrace();
67+
}
68+
channel.close();
69+
return responseJson;
70+
}
71+
72+
@Override
73+
public <R> CompletableFuture<JsonRpcResponse<R>> sendRequestForResponseAsync(JsonRpcRequest request, JavaType responseType) {
74+
return supplyAsync(() -> this.sendRequestForResponse(request, responseType));
75+
}
76+
77+
@Override
78+
public URI getServerURI() {
79+
return socketAddress.getPath().toUri();
80+
}
81+
82+
public static Path getLightningRpcPath() {
83+
return Path.of(System.getProperty("user.home")).resolve(".lightning/regtest/lightning-rpc");
84+
}
85+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.consensusj.jsonrpc.unix;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JavaType;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import org.consensusj.jsonrpc.JsonRpcRequest;
7+
import org.consensusj.jsonrpc.JsonRpcResponse;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.io.IOException;
12+
import java.nio.ByteBuffer;
13+
import java.nio.channels.SocketChannel;
14+
import java.util.Optional;
15+
16+
/**
17+
* Common code for JsonRpc UNIX Sockets clients and servers
18+
*/
19+
public class JsonRpcUnixSocketMapper {
20+
private static final Logger log = LoggerFactory.getLogger(JsonRpcUnixSocketMapper.class);
21+
22+
private final ObjectMapper mapper;
23+
24+
/**
25+
* @param mapper Jackson mapper
26+
*/
27+
public JsonRpcUnixSocketMapper(ObjectMapper mapper) {
28+
this.mapper = mapper;
29+
}
30+
31+
public <R> JsonRpcResponse<R> readSocketResponse(JsonRpcRequest request, JavaType responseType, SocketChannel channel) throws IOException, InterruptedException {
32+
Optional<String> resp = Optional.empty();
33+
while (resp.isEmpty()) {
34+
Thread.sleep(100);
35+
resp = readSocketMessage(channel);
36+
resp.ifPresent(System.out::println);
37+
}
38+
JsonRpcResponse<R> responseJson = deserializeResponse(responseType, resp.orElseThrow());
39+
return responseJson;
40+
}
41+
42+
public Optional<String> readSocketMessage(SocketChannel channel) throws IOException {
43+
ByteBuffer buffer = ByteBuffer.allocate(10240);
44+
int bytesRead = channel.read(buffer);
45+
if (bytesRead < 0)
46+
return Optional.empty();
47+
48+
byte[] bytes = new byte[bytesRead];
49+
buffer.flip();
50+
buffer.get(bytes);
51+
String message = new String(bytes);
52+
return Optional.of(message);
53+
}
54+
55+
public <R> JsonRpcResponse<R> deserializeResponse(JavaType responseType, String s) throws JsonProcessingException {
56+
JsonRpcResponse<R> responseJson;
57+
log.debug("Response String: {}", s);
58+
try {
59+
responseJson = mapper.readValue(s, responseType);
60+
} catch (JsonProcessingException e) {
61+
log.error("JsonProcessingException: ", e);
62+
// TODO: Map to some kind of JsonRPC exception similar to JsonRPCStatusException
63+
throw e;
64+
}
65+
return responseJson;
66+
}
67+
68+
public JsonRpcRequest deserializeRequest(String s) throws JsonProcessingException {
69+
JsonRpcRequest requestJson;
70+
log.debug("Request String: {}", s);
71+
try {
72+
requestJson = mapper.readValue(s, JsonRpcRequest.class);
73+
} catch (JsonProcessingException e) {
74+
log.error("JsonProcessingException: ", e);
75+
// TODO: Map to some kind of JsonRPC exception similar to JsonRPCStatusException
76+
throw e;
77+
}
78+
return requestJson;
79+
}
80+
81+
public ByteBuffer serializeRequest(JsonRpcRequest request) throws JsonProcessingException {
82+
String message = mapper.writeValueAsString(request);
83+
ByteBuffer buffer = ByteBuffer.allocate(1024);
84+
buffer.clear();
85+
buffer.put(message.getBytes());
86+
buffer.flip();
87+
return buffer;
88+
}
89+
90+
public <R> ByteBuffer serializeResponse(JsonRpcResponse<R> response) throws JsonProcessingException {
91+
String message = mapper.writeValueAsString(response);
92+
ByteBuffer buffer = ByteBuffer.allocate(1024);
93+
buffer.clear();
94+
buffer.put(message.getBytes());
95+
buffer.flip();
96+
return buffer;
97+
}
98+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.consensusj.jsonrpc.unix;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.DeserializationFeature;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import org.consensusj.jsonrpc.JsonRpcError;
7+
import org.consensusj.jsonrpc.JsonRpcRequest;
8+
import org.consensusj.jsonrpc.JsonRpcResponse;
9+
10+
import java.io.IOException;
11+
import java.net.StandardProtocolFamily;
12+
import java.net.UnixDomainSocketAddress;
13+
import java.nio.ByteBuffer;
14+
import java.nio.channels.ServerSocketChannel;
15+
import java.nio.channels.SocketChannel;
16+
import java.nio.file.Files;
17+
import java.nio.file.Path;
18+
19+
/**
20+
* For possible implementation with CompletableFuture,
21+
* See https://github.com/IBM/java-async-util/blob/master/asyncutil/src/test/java/com/ibm/asyncutil/examples/nio/nio.md#nio-bridge
22+
*
23+
*/
24+
public class UnixSocketEchoServer {
25+
private final UnixDomainSocketAddress socketAddress;
26+
private final JsonRpcUnixSocketMapper socketMapper;
27+
28+
public static void main(String[] args) throws IOException, InterruptedException {
29+
Path socketPath = getTestPath();
30+
Files.deleteIfExists(socketPath);
31+
UnixSocketEchoServer server = new UnixSocketEchoServer(socketPath);
32+
server.run();
33+
}
34+
35+
public UnixSocketEchoServer(Path socketPath) {
36+
socketAddress = UnixDomainSocketAddress.of(socketPath);
37+
socketMapper = new JsonRpcUnixSocketMapper(getMapper());
38+
39+
// TODO: Delete on close:
40+
// Files.deleteIfExists(socketPath);
41+
}
42+
43+
public void run() throws IOException, InterruptedException {
44+
ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
45+
serverChannel.bind(socketAddress);
46+
SocketChannel channel = serverChannel.accept();
47+
while (true) {
48+
var optMessage = socketMapper.readSocketMessage(channel);
49+
if (optMessage.isPresent()) {
50+
processMessage(channel, optMessage.get());
51+
}
52+
channel.close();
53+
channel = serverChannel.accept();
54+
Thread.sleep(100);
55+
}
56+
}
57+
58+
private void processMessage(SocketChannel channel, String message) throws IOException {
59+
System.out.printf("[Client message] %s\n", message);
60+
JsonRpcRequest request;
61+
try {
62+
request = socketMapper.deserializeRequest(message);
63+
System.out.println("Got " + request.getMethod() + " request");
64+
JsonRpcResponse<?> response = switch (request.getMethod()) {
65+
case "getinfo" -> new JsonRpcResponse<>(request, "Echo GETINFO Response");
66+
default -> new JsonRpcResponse<>(request, JsonRpcError.of(JsonRpcError.Error.METHOD_NOT_FOUND));
67+
};
68+
ByteBuffer buffer = socketMapper.serializeResponse(response);
69+
while (buffer.hasRemaining()) {
70+
channel.write(buffer);
71+
}
72+
} catch (JsonProcessingException e) {
73+
throw new RuntimeException(e);
74+
}
75+
}
76+
77+
static ObjectMapper getMapper() {
78+
var mapper = new ObjectMapper();
79+
// TODO: Provide external API to configure FAIL_ON_UNKNOWN_PROPERTIES
80+
// TODO: Remove "ignore unknown" annotations on various POJOs that we've defined.
81+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
82+
return mapper;
83+
}
84+
85+
static Path getTestPath() {
86+
return Path.of(System.getProperty("user.home")).resolve("consensusj.socket");
87+
}
88+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.consensusj.jsonrpc.unix;
2+
3+
import com.fasterxml.jackson.databind.JavaType;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import org.consensusj.jsonrpc.DefaultRpcClient;
6+
import org.consensusj.jsonrpc.JsonRpcTransport;
7+
8+
import java.nio.file.Path;
9+
10+
/**
11+
* Factory for creating {@link JsonRpcClientUnixSocket} from an {@link ObjectMapper}. The
12+
* factory instance is created with the {@link Path} to the desired socket.
13+
*/
14+
public class UnixSocketTransportFactory implements DefaultRpcClient.TransportFactory {
15+
private final Path socketPath;
16+
17+
/**
18+
* Create a factory instance for a given Unix socket path.
19+
* @param socketPath Path to the desired socket.
20+
*/
21+
public UnixSocketTransportFactory(Path socketPath) {
22+
this.socketPath = socketPath;
23+
}
24+
25+
@Override
26+
public JsonRpcTransport<JavaType> create(ObjectMapper mapper) {
27+
return new JsonRpcClientUnixSocket(socketPath, mapper);
28+
}
29+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ if (JavaVersion.current().compareTo(JavaVersion.VERSION_17) < 0) {
3434

3535
// JDK 17
3636
include 'consensusj-jsonrpc-daemon' // JSON-RPC sample server
37+
include 'consensusj-jsonrpc-unix' // UNIX Domain Sockets for JSONRPC
3738
include 'cj-bitcoinj-spock' // Spock tests/demos of basic bitcoinj capabilities
3839
include 'cj-bitcoinj-dsl-js' // JavaScript DSL for bitcoinj via Nashorn
3940
include 'cj-btc-daemon' // Prototype Micronaut version of Bitcoin daemon

0 commit comments

Comments
 (0)