diff --git a/package.json b/package.json index b7325aa..ce95a91 100644 --- a/package.json +++ b/package.json @@ -124,10 +124,12 @@ }, "scripts": { "generate:proto": "npx protoc --ts_out proto_ts --proto_path src src/*.proto", + "generate:webrtc-peer": "npx protoc --ts_out proto_ts --proto_path src src/peer_transport/pb/hs.proto", "build": "aegir build", "test": "aegir test -t browser", "test:chrome": "aegir test -t browser -f \"./dist/test/**/*.spec.js\" --cov", "test:firefox": "aegir test -t browser -f \"./dist/test/**/*.spec.js\" -- --browser firefox", + "test:peer": "aegir test -t browser -f \"./dist/test/peer.browser.spec.js\"", "lint": "aegir lint", "lint:fix": "aegir lint --fix", "clean": "aegir clean", @@ -138,33 +140,41 @@ "@chainsafe/libp2p-noise": "^10.0.0", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^1.0.5", - "@libp2p/interface-stream-muxer": "^3.0.0", + "@libp2p/interface-stream-muxer": "^3.0.4", "@libp2p/interface-transport": "^2.0.0", "@libp2p/logger": "^2.0.0", "@libp2p/peer-id": "^1.1.15", - "@multiformats/multiaddr": "^11.0.3", + "@multiformats/multiaddr": "../libp2p/js-multiaddr", + "@multiformats/mafmt": "../libp2p/js-mafmt", "@protobuf-ts/runtime": "^2.8.0", "err-code": "^3.0.1", + "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.3", "it-merge": "^2.0.0", + "it-pb-stream": "^2.0.3", "it-pipe": "^2.0.4", "it-pushable": "^3.1.0", "it-stream-types": "^1.0.4", "multiformats": "^10.0.0", "multihashes": "^4.0.3", "p-defer": "^4.0.0", + "protons-runtime": "^4.0.1", + "timeout-abort-controller": "^3.0.0", "uint8arraylist": "^2.3.3", "uint8arrays": "^4.0.2", "uuid": "^9.0.0" }, "devDependencies": { "@libp2p/interface-mocks": "^8.0.1", + "@libp2p/mplex": "^7.1.1", "@libp2p/peer-id-factory": "^1.0.19", + "@libp2p/websockets": "^5.0.2", "@protobuf-ts/plugin": "^2.8.0", "@protobuf-ts/protoc": "^2.8.0", "@types/uuid": "^8.3.4", "aegir": "^37.6.6", "it-first": "^2.0.0", - "libp2p": "^0.40.0" + "libp2p": "file:../js-libp2p", + "protons": "^6.0.1" } } diff --git a/proto_ts/peer_transport/pb/hs.ts b/proto_ts/peer_transport/pb/hs.ts new file mode 100644 index 0000000..7859658 --- /dev/null +++ b/proto_ts/peer_transport/pb/hs.ts @@ -0,0 +1,97 @@ +// @generated by protobuf-ts 2.8.2 +// @generated from protobuf file "peer_transport/pb/hs.proto" (package "webrtc_peer.pb", syntax proto2) +// tslint:disable +import type { BinaryWriteOptions } from "@protobuf-ts/runtime"; +import type { IBinaryWriter } from "@protobuf-ts/runtime"; +import { WireType } from "@protobuf-ts/runtime"; +import type { BinaryReadOptions } from "@protobuf-ts/runtime"; +import type { IBinaryReader } from "@protobuf-ts/runtime"; +import { UnknownFieldHandler } from "@protobuf-ts/runtime"; +import type { PartialMessage } from "@protobuf-ts/runtime"; +import { reflectionMergePartial } from "@protobuf-ts/runtime"; +import { MESSAGE_TYPE } from "@protobuf-ts/runtime"; +import { MessageType } from "@protobuf-ts/runtime"; +/** + * @generated from protobuf message webrtc_peer.pb.Message + */ +export interface Message { + /** + * @generated from protobuf field: webrtc_peer.pb.Message.MessageType type = 1; + */ + type: Message_MessageType; + /** + * @generated from protobuf field: string data = 2; + */ + data: string; +} +/** + * @generated from protobuf enum webrtc_peer.pb.Message.MessageType + */ +export enum Message_MessageType { + /** + * @generated from protobuf enum value: OFFER = 0; + */ + OFFER = 0, + /** + * @generated from protobuf enum value: ANSWER = 1; + */ + ANSWER = 1, + /** + * @generated from protobuf enum value: CANDIDATE = 2; + */ + CANDIDATE = 2 +} +// @generated message type with reflection information, may provide speed optimized methods +class Message$Type extends MessageType { + constructor() { + super("webrtc_peer.pb.Message", [ + { no: 1, name: "type", kind: "enum", T: () => ["webrtc_peer.pb.Message.MessageType", Message_MessageType] }, + { no: 2, name: "data", kind: "scalar", T: 9 /*ScalarType.STRING*/ } + ]); + } + create(value?: PartialMessage): Message { + const message = { type: 0, data: "" }; + globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this }); + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* webrtc_peer.pb.Message.MessageType type */ 1: + message.type = reader.int32(); + break; + case /* string data */ 2: + message.data = reader.string(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* webrtc_peer.pb.Message.MessageType type = 1; */ + if (message.type !== 0) + writer.tag(1, WireType.Varint).int32(message.type); + /* string data = 2; */ + if (message.data !== "") + writer.tag(2, WireType.LengthDelimited).string(message.data); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message webrtc_peer.pb.Message + */ +export const Message = new Message$Type(); diff --git a/src/index.ts b/src/index.ts index 826069a..fb5a117 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,12 @@ import type { Transport } from '@libp2p/interface-transport' +import type { WebRTCPeerTransportComponents, WebRTCPeerTransportInit } from './peer_transport/transport.js' +import { WebRTCPeerTransport } from './peer_transport/transport.js' import { WebRTCTransport, WebRTCTransportComponents } from './transport.js' export function webRTC (): (components: WebRTCTransportComponents) => Transport { return (components: WebRTCTransportComponents) => new WebRTCTransport(components) } + +export function webRTCPeer (init: WebRTCPeerTransportInit): (components: WebRTCPeerTransportComponents) => Transport { + return (components: WebRTCPeerTransportComponents) => new WebRTCPeerTransport(components, init) +} diff --git a/src/muxer.ts b/src/muxer.ts index 7b0eb83..e101002 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -11,17 +11,17 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory { */ private readonly peerConnection: RTCPeerConnection - /** - * The string representation of the protocol, required by `StreamMuxerFactory` - */ - protocol: string = '/webrtc' - - constructor (peerConnection: RTCPeerConnection) { + constructor (peerConnection: RTCPeerConnection, readonly protocol = '/webrtc') { this.peerConnection = peerConnection + // reject any datachannels as the muxer is not yet ready to process + // streams + this.peerConnection.ondatachannel = ({ channel }) => { + channel.close() + } } createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer { - return new DataChannelMuxer(this.peerConnection, init) + return new DataChannelMuxer(this.peerConnection, this.protocol, init) } } @@ -33,10 +33,6 @@ export class DataChannelMuxer implements StreamMuxer { * WebRTC Peer Connection */ private readonly peerConnection: RTCPeerConnection - /** - * The protocol as represented in the multiaddress - */ - readonly protocol: string = '/webrtc' /** * Array of streams in the data channel @@ -63,7 +59,7 @@ export class DataChannelMuxer implements StreamMuxer { */ sink: Sink> = nopSink; - constructor (peerConnection: RTCPeerConnection, init?: StreamMuxerInit) { + constructor (peerConnection: RTCPeerConnection, readonly protocol = '/webrtc', init?: StreamMuxerInit) { /** * Initialized stream muxer */ diff --git a/src/peer_transport/listener.ts b/src/peer_transport/listener.ts new file mode 100644 index 0000000..c537dae --- /dev/null +++ b/src/peer_transport/listener.ts @@ -0,0 +1,35 @@ +// import type { ConnectionManager } from '@libp2p/interface-connection-manager' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { ListenerEvents, TransportManager, Upgrader, Listener } from '@libp2p/interface-transport' +import { EventEmitter } from '@libp2p/interfaces/events' +import { multiaddr, Multiaddr } from '@multiformats/multiaddr' + +export interface ListenerOptions { + peerId: PeerId + upgrader: Upgrader + transportManager: TransportManager +} + +export class WebRTCPeerListener extends EventEmitter implements Listener { + constructor ( + private readonly opts: ListenerOptions + ) { + super() + } + + private listeningAddrs: Multiaddr[] = [] + async listen (ma: Multiaddr): Promise { + const baseAddr = multiaddr(ma.toString().split('/webrtc-peer').find(a => a !== '')) + const tpt = this.opts.transportManager.transportForMultiaddr(baseAddr) + const listener = tpt?.createListener({ ...this.opts }) + await listener?.listen(baseAddr) + const listeningAddr = ma.encapsulate(`/p2p/${this.opts.peerId}`) + this.listeningAddrs.push(listeningAddr) + listener?.addEventListener('close', () => { + this.listeningAddrs = this.listeningAddrs.filter(a => a !== listeningAddr) + }) + } + + getAddrs (): Multiaddr[] { return this.listeningAddrs } + async close () { } +} diff --git a/src/peer_transport/pb/hs.proto b/src/peer_transport/pb/hs.proto new file mode 100644 index 0000000..cc11597 --- /dev/null +++ b/src/peer_transport/pb/hs.proto @@ -0,0 +1,14 @@ +syntax = "proto2"; + +package webrtc_peer.pb; + +message Message { + enum MessageType { + OFFER = 0; + ANSWER = 1; + CANDIDATE = 2; + } + + required MessageType type = 1; + required string data = 2; +} diff --git a/src/peer_transport/transport.ts b/src/peer_transport/transport.ts new file mode 100644 index 0000000..7e92ec2 --- /dev/null +++ b/src/peer_transport/transport.ts @@ -0,0 +1,275 @@ +import type { Connection } from '@libp2p/interface-connection' +import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport' +import type { ConnectionHandler, TransportManager, Upgrader } from '@libp2p/interface-transport' +import { multiaddr, Multiaddr } from '@multiformats/multiaddr' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' +import { pbStream, ProtobufStream } from 'it-pb-stream' +import pDefer, { DeferredPromise } from 'p-defer' +import type { PeerId } from '@libp2p/interface-peer-id' +import { abortableDuplex } from 'abortable-iterator' +import { TimeoutController } from 'timeout-abort-controller' +import { WebRTCMultiaddrConnection } from '../maconn.js' +import type { Startable } from '@libp2p/interfaces/startable' +import { DataChannelMuxerFactory } from '../muxer.js' +import { WebRTCPeerListener } from './listener.js' +import type { PeerStore } from '@libp2p/interface-peer-store' +import { logger } from '@libp2p/logger' +import * as pb from '../../proto_ts/peer_transport/pb/hs.js' +// import type { ConnectionManager } from '@libp2p/interface-connection-manager' +// import * as p from '@libp2p/peer-id' + +const log = logger('libp2p:webrtc:peer') + +const TIMEOUT = 30 * 1000 +export const TRANSPORT = '/webrtc-peer' +export const PROTOCOL = '/webrtc-peer/0.0.1' +export const CODE = 281 + +export interface WebRTCPeerTransportInit { + rtcConfiguration?: RTCConfiguration +} + +export interface WebRTCPeerTransportComponents { + peerId: PeerId + registrar: Registrar + upgrader: Upgrader + transportManager: TransportManager + // connectionManager: ConnectionManager + peerStore: PeerStore +} + +export class WebRTCPeerTransport implements Transport, Startable { + private readonly _started = false + private readonly handler?: ConnectionHandler + + constructor ( + private readonly components: WebRTCPeerTransportComponents, + private readonly init: WebRTCPeerTransportInit + ) {} + + isStarted () { + return this._started + } + + async start () { + await this.components.registrar.handle(PROTOCOL, this._onProtocol.bind(this)) + // this.components.peerStore.addEventListener('change:multiaddrs', (event) => { + // const { peerId } = event.detail + // }) + } + + async stop () { + await this.components.registrar.unhandle(PROTOCOL) + } + + createListener (options: CreateListenerOptions): Listener { + return new WebRTCPeerListener(this.components) + } + + get [Symbol.toStringTag] (): string { + return '@libp2p/webrtc-peer' + } + + get [symbol] (): true { + return true + } + + filter (multiaddrs: Multiaddr[]): Multiaddr[] { + return multiaddrs.filter((ma) => { + const codes = ma.protoCodes() + return codes.includes(CODE) + }) + } + + /* + * dial connects to a remote via the circuit relay or any other protocol + * and proceeds to upgrade to a webrtc connection. + * multiaddr of the form: /webrtc-peer/p2p/ + * For a circuit relay, this will be of the form + * /p2p//p2p-circuit/p2p//webrtc-sdp/p2p/ + */ + async dial (ma: Multiaddr, options: DialOptions): Promise { + // extract peer id + // const remotePeerId = ma.getPeerId() + // if (!remotePeerId) { + // throw("peerId should be present in multiaddr") + // } + // const remotePeer = p.peerIdFromString(remotePeerId) + const addrs = ma.toString().split('/webrtc-peer') + const relayed = multiaddr(addrs[0]) + // const destination = multiaddr(addrs[addrs.length - 1]) + // + if (options.signal == null) { + options.signal = new AbortSignal() + } + + const connection = await this.components.transportManager.dial(relayed) + + const rawStream = await connection.newStream([PROTOCOL], options) + const stream = pbStream(abortableDuplex(rawStream, options.signal)) + + const pc = new RTCPeerConnection(this.init.rtcConfiguration) + const channel = pc.createDataChannel('init') + const offer = await pc.createOffer() + await pc.setLocalDescription(offer) + + const connectedPromise = pDefer() + pc.onconnectionstatechange = (_) => { + switch (pc.connectionState) { + case 'connected': + return connectedPromise.resolve(0) + case 'closed': + case 'disconnected': + case 'failed': + return connectedPromise.reject() + } + } + options.signal.onabort = connectedPromise.reject + pc.onicecandidate = ({ candidate }) => { + writeMessage(stream, { + type: pb.Message_MessageType.CANDIDATE, + data: (candidate != null) ? JSON.stringify(candidate) : '' + }) + } + // write offer + writeMessage(stream, { type: pb.Message_MessageType.OFFER, data: offer.sdp! }) + + // read answer + const answerMessage = await readMessage(stream) + if (answerMessage.type != pb.Message_MessageType.ANSWER) { + throw new Error('should read answer') + } + + const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) + await pc.setRemoteDescription(answerSdp) + + let continueReading = true + while (continueReading) { + const result = await Promise.race([connectedPromise.promise, readMessage(stream)]) + if (result === 0) { + break + } + + const message = result as pb.Message + if (message.type != pb.Message_MessageType.CANDIDATE) { + continue + } + + if (message.data == '') { + continueReading = false + break + } + + const candidate = new RTCIceCandidate(JSON.parse(message.data)) + pc.addIceCandidate(candidate) + } + + await connectedPromise.promise + rawStream.close() + channel.close() + const result = options.upgrader.upgradeOutbound( + new WebRTCMultiaddrConnection({ + peerConnection: pc, + timeline: { open: (new Date()).getTime() }, + remoteAddr: connection.remoteAddr + }), + { + skipProtection: true, + skipEncryption: true, + muxerFactory: new DataChannelMuxerFactory(pc, '/webrtc-peer') + } + ) + void connection.close() + // TODO: hack + await new Promise(res => setTimeout(res, 100)) + return await result + } + + async _onProtocol ({ connection, stream: rawStream }: IncomingStreamData) { + const timeoutController = new TimeoutController(TIMEOUT) + const signal = timeoutController.signal + const stream = pbStream(abortableDuplex(rawStream, timeoutController.signal)) + const pc = new RTCPeerConnection(this.init.rtcConfiguration) + + const connectedPromise: DeferredPromise = pDefer() + signal.onabort = () => connectedPromise.reject() + // candidate callbacks + pc.onicecandidate = ({ candidate }) => { + writeMessage(stream, { + type: pb.Message_MessageType.CANDIDATE, + data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' + }) + } + pc.onconnectionstatechange = (_) => { + log.trace('received pc state: ', pc.connectionState) + switch (pc.connectionState) { + case 'connected': + connectedPromise.resolve(0) + break + case 'failed': + case 'disconnected': + case 'closed': + connectedPromise.reject() + } + } + + const pbOffer = await readMessage(stream) + if (pbOffer.type != pb.Message_MessageType.OFFER) { + throw new Error('initial message should be an offer') + } + const offer = new RTCSessionDescription({ + type: 'offer', + sdp: pbOffer.data + }) + + await pc.setRemoteDescription(offer) + log.trace('offer', offer) + const answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + writeMessage(stream, { type: pb.Message_MessageType.ANSWER, data: answer.sdp! }) + log.trace('answer', offer) + let continueReading = true + while (continueReading) { + const result = await Promise.race([connectedPromise.promise, readMessage(stream)]) + if (result === 0) { + break + } + + const message = result as pb.Message + if (message.type != pb.Message_MessageType.CANDIDATE) { + throw new Error('should only receive trickle candidates') + } + if (message.data === '') { + continueReading = false + break + } + + const candidate = new RTCIceCandidate(JSON.parse(message.data)) + await pc.addIceCandidate(candidate) + } + await connectedPromise.promise + rawStream.close() + const muxerFactory = new DataChannelMuxerFactory(pc, '/webrtc-peer') + const conn = await this.components.upgrader.upgradeInbound(new WebRTCMultiaddrConnection({ + peerConnection: pc, + timeline: { open: (new Date()).getTime() }, + remoteAddr: connection.remoteAddr + }), { + skipEncryption: true, + skipProtection: true, + muxerFactory + }) + if (this.handler != null) { + this.handler(conn) + } + } +} + +function writeMessage (stream: ProtobufStream, message: pb.Message) { + stream.writeLP(pb.Message.toBinary(message)) +} + +async function readMessage (stream: ProtobufStream): Promise { + const raw = await stream.readLP() + return pb.Message.fromBinary(raw.subarray()) +} diff --git a/src/sdp.ts b/src/sdp.ts index 10b1591..fff45df 100644 --- a/src/sdp.ts +++ b/src/sdp.ts @@ -12,10 +12,11 @@ const log = logger('libp2p:webrtc:sdp') * Get base2 | identity decoders */ export const mbdecoder: any = (function () { - const decoders = Object.values(bases).map((b) => b.decoder) - let acc = decoders[0].or(decoders[1]) - decoders.slice(2).forEach((d) => (acc = acc.or(d))) - return acc + // const decoders = Object.values(bases).map((b) => b.decoder) + // let acc = decoders[0].or(decoders[1]) + // decoders.slice(2).forEach((d) => (acc = acc.or(d))) + // return acc + return bases.base64url.decoder })() /** diff --git a/src/stream.ts b/src/stream.ts index 77ca333..20535df 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -7,6 +7,7 @@ import { pushable } from 'it-pushable' import defer, { DeferredPromise } from 'p-defer' import type { Source, Sink } from 'it-stream-types' import { Uint8ArrayList } from 'uint8arraylist' +// import { toString as uint8arrayToString } from 'uint8arrays/to-string' import * as pb from '../proto_ts/message.js' @@ -196,6 +197,11 @@ export class WebRTCStream implements Stream { */ closeWritePromise: DeferredPromise = defer(); + /** + * Boolean value specifying if channel was closed locally + */ + localClosed: boolean = false + /** * Callback to invoke when the stream is closed. */ @@ -204,7 +210,6 @@ export class WebRTCStream implements Stream { constructor (opts: StreamInitOpts) { this.channel = opts.channel this.id = this.channel.label - this.stat = opts.stat switch (this.channel.readyState) { case 'open': @@ -235,7 +240,9 @@ export class WebRTCStream implements Stream { } this.channel.onclose = (_evt) => { - this.close() + if (!this.localClosed) { + this.close() + } } this.channel.onerror = (evt) => { @@ -246,11 +253,13 @@ export class WebRTCStream implements Stream { const self = this // reader pipe - this.channel.onmessage = async ({ data }) => { - if (data === null || data.length === 0) { + this.channel.onmessage = async ({ data: d }) => { + const data = d as ArrayBuffer + if (data.byteLength === 0) { return } - this._innersrc.push(new Uint8Array(data as ArrayBufferLike)) + // console.log('incoming', this.channel.id, (data as ArrayBuffer).byteLength) + this._innersrc.push(new Uint8Array(data)) } // pipe framed protobuf messages through a length prefixed decoder, and @@ -262,6 +271,7 @@ export class WebRTCStream implements Stream { for await (const buf of source) { const message = self.processIncomingProtobuf(buf.subarray()) if (message != null) { + // console.log('read', uint8arrayToString(message)) yield new Uint8ArrayList(message) } } @@ -304,6 +314,8 @@ export class WebRTCStream implements Stream { const msgbuf = pb.Message.toBinary({ message: buf.subarray() }) const sendbuf = lengthPrefixed.encode.single(msgbuf) + // console.log('wrote ', uint8arrayToString(msgbuf)) + this.channel.send(sendbuf.subarray()) } } @@ -314,7 +326,7 @@ export class WebRTCStream implements Stream { processIncomingProtobuf (buffer: Uint8Array): Uint8Array | undefined { const message = pb.Message.fromBinary(buffer) - if (message.flag !== undefined) { + if (message.flag) { const [currentState, nextState] = this.streamState.transition({ direction: 'inbound', flag: message.flag }) if (currentState !== nextState) { @@ -341,11 +353,18 @@ export class WebRTCStream implements Stream { /** * Close a stream for reading and writing */ - close (): void { + async close (): Promise { + if (this.channel.readyState === 'closed') { + return + } + this.localClosed = true this.stat.timeline.close = new Date().getTime() this.streamState.state = StreamStates.CLOSED - this._innersrc.end() this.closeWritePromise.resolve() + // await close callback + this.channel.addEventListener('close', () => { + this._innersrc.end() + }) this.channel.close() if (this.closeCb !== undefined) { diff --git a/src/transport.ts b/src/transport.ts index 6fc10bd..63782d5 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -4,19 +4,21 @@ import type { PeerId } from '@libp2p/interface-peer-id' import { CreateListenerOptions, Listener, symbol, Transport } from '@libp2p/interface-transport' import { logger } from '@libp2p/logger' import * as p from '@libp2p/peer-id' -import type { Multiaddr } from '@multiformats/multiaddr' +import { multiaddr, Multiaddr } from '@multiformats/multiaddr' import * as multihashes from 'multihashes' import defer from 'p-defer' import { v4 as genUuid } from 'uuid' import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' import { concat } from 'uint8arrays/concat' -import { dataChannelError, inappropriateMultiaddr, unimplemented, invalidArgument } from './error.js' +import { dataChannelError, invalidArgument } from './error.js' import { WebRTCMultiaddrConnection } from './maconn.js' import { DataChannelMuxerFactory } from './muxer.js' import type { WebRTCDialOptions } from './options.js' import * as sdp from './sdp.js' import { WebRTCStream } from './stream.js' +import * as mafmt from '@multiformats/mafmt' +// import { EventEmitter } from '@libp2p/interfaces/events' const log = logger('libp2p:webrtc:transport') @@ -70,7 +72,12 @@ export class WebRTCTransport implements Transport { * Create transport listeners no supported by browsers */ createListener (options: CreateListenerOptions): Listener { - throw unimplemented('WebRTCTransport.createListener') + throw 'browser does not support listening' + // return Object.assign(new EventEmitter(), { + // listen: async (_: Multiaddr) => {}, + // getAddrs: () => [], + // close: async () => {}, + // }) } /** @@ -98,13 +105,9 @@ export class WebRTCTransport implements Transport { * Connect to a peer using a multiaddr */ async _connect (ma: Multiaddr, options: WebRTCDialOptions): Promise { - const rps = ma.getPeerId() - - if (rps === null) { - throw inappropriateMultiaddr("we need to have the remote's PeerId") - } - - const remoteCerthash = sdp.decodeCerthash(sdp.certhash(ma)) + const addr = extractDialableMa(ma) + const remotePeerId = addr.getPeerId() + const remoteCerthash = sdp.decodeCerthash(sdp.certhash(addr)) // ECDSA is preferred over RSA here. From our testing we find that P-256 elliptic // curve is supported by Pion, webrtc-rs, as well as Chromium (P-228 and P-384 @@ -160,7 +163,7 @@ export class WebRTCTransport implements Transport { await dataChannelOpenPromise.promise const myPeerId = this.components.peerId - const theirPeerId = p.peerIdFromString(rps) + const theirPeerId = remotePeerId != null ? p.peerIdFromString(remotePeerId) : undefined // Do noise handshake. // Set the Noise Prologue to libp2p-webrtc-noise: before starting the actual Noise handshake. @@ -187,7 +190,7 @@ export class WebRTCTransport implements Transport { // handshake ensures that the stream opening callback is set up const maConn = new WebRTCMultiaddrConnection({ peerConnection, - remoteAddr: ma, + remoteAddr: addr, timeline: { open: (new Date()).getTime() } @@ -198,6 +201,7 @@ export class WebRTCTransport implements Transport { // For outbound connections, the remote is expected to start the noise handshake. // Therefore, we need to secure an inbound noise connection from the remote. await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId) + // maConn.remoteAddr = maConn.remoteAddr.decapsulateCode(421).encapsulate(multiaddr(`/p2p/${secureConn.remotePeer}`)) return await options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) } @@ -238,6 +242,20 @@ export class WebRTCTransport implements Transport { * a Certhash Code (466) and a PeerId */ function validMa (ma: Multiaddr): boolean { - const codes = ma.protoCodes() - return codes.includes(WEBRTC_CODE) && codes.includes(CERTHASH_CODE) && ma.getPeerId() != null + return mafmt.WebRTC.matches(ma) +} + +/** + * + */ +function extractDialableMa (ma: Multiaddr): Multiaddr { + const addrString = ma.toString() + const certhash = addrString.match(/\/certhash\/\w+/) + const addr = addrString.split(/\/certhash\/\w+/) + let result = multiaddr(addr[0] + certhash![0]) + if (addr.length > 1 && addr[1].startsWith('/p2p')) { + const peer = addr[1].match(/\/p2p\/\w+/)![0] + result = result.encapsulate(peer) + } + return result } diff --git a/test/peer.browser.spec.ts b/test/peer.browser.spec.ts new file mode 100644 index 0000000..b4928f7 --- /dev/null +++ b/test/peer.browser.spec.ts @@ -0,0 +1,143 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions */ + +import { createLibp2p } from 'libp2p' +import { webRTC, webRTCPeer } from '../src' +import { webSockets } from '@libp2p/websockets' +import * as filters from '@libp2p/websockets/filters' +import { mplex } from '@libp2p/mplex' +import { noise } from '@chainsafe/libp2p-noise' +import { multiaddr } from '@multiformats/multiaddr' +import pDefer from 'p-defer' +import { pipe } from 'it-pipe' +import { fromString } from 'uint8arrays/from-string' +import { toString } from 'uint8arrays/to-string' +import first from 'it-first' +import { expect } from 'aegir/chai' + +describe.skip('test relay', () => { + it('can connect over ws relay', async () => { + const relayAddress = multiaddr('/ip4/192.168.1.101/tcp/4003/ws/p2p/QmceBZS6MN8uX4vvRdpTnu9Ga2uhHzRnVYLYPTnyWHkyHc') + const listener = await createLibp2p({ + transports: [ + webRTCPeer({}), + webSockets({ + filter: filters.all + }) + ], + streamMuxers: [mplex()], + connectionEncryption: [noise()], + relay: { + enabled: true, + autoRelay: { + enabled: true, + maxListeners: 2 + } + } + }) + const dialer = await createLibp2p({ + transports: [ + webRTCPeer({}), + webSockets({ + filter: filters.all + }) + ], + streamMuxers: [mplex()], + connectionEncryption: [noise()], + identify: { + timeout: 30000 + } + }) + + await listener.start() + await dialer.start() + + const relaying = pDefer() + listener.peerStore.addEventListener('change:multiaddrs', (event) => { + const { peerId } = event.detail + + // Updated self multiaddrs? + if (peerId.equals(listener.peerId)) { + const webrtcAddr = `${listener.getMultiaddrs()[0].toString()}/webrtc-peer/p2p/${peerId}` + relaying.resolve(webrtcAddr) + } + }) + + listener.handle('/echo/1.0.0', ({ stream }) => { + void pipe(stream, stream) + }) + + await listener.dial(relayAddress) + const dialAddr = multiaddr(await relaying.promise) + const stream = await dialer.dialProtocol(dialAddr, ['/echo/1.0.0']) + const input = fromString('test') + const output = await pipe( + [input], + stream, + async (source) => await first(source) + ) + expect(toString(output!.subarray())).to.equals('test') + console.log('read data', output!.subarray()) + }) + + it.only('can connect over webrtc relay', async () => { + const relayAddress = multiaddr('/ip4/192.168.1.101/udp/4004/webrtc/certhash/uEiBMYhADacrAg34fe9Vfj9cy6ZaNx5EqhLnWtol9zFvO5A/p2p/QmZxycy737ycr2vGWwT9HreJGAtNu3EGLAfwuGrMfBxVF4') + const listener = await createLibp2p({ + transports: [ + webRTCPeer({}), + webRTC() + ], + streamMuxers: [mplex()], + connectionEncryption: [noise()], + relay: { + enabled: true, + autoRelay: { + enabled: true, + maxListeners: 2 + } + } + }) + const dialer = await createLibp2p({ + transports: [ + webRTCPeer({}), + webRTC() + ], + streamMuxers: [mplex()], + connectionEncryption: [noise()], + relay: { + enabled: true + } + }) + + await listener.start() + await dialer.start() + + const relaying = pDefer() + listener.peerStore.addEventListener('change:multiaddrs', (event) => { + const { peerId } = event.detail + + if (peerId.equals(listener.peerId)) { + const webrtcAddr = `${listener.getMultiaddrs()[0].toString()}/webrtc-peer/p2p/${peerId}` + relaying.resolve(webrtcAddr) + } + }) + + listener.handle('/echo/1.0.0', ({ stream }) => { + void pipe(stream, stream) + }) + + await listener.dial(relayAddress) + const addr = await relaying.promise + const dialAddr = multiaddr(addr) + const stream = await dialer.dialProtocol(dialAddr, ['/echo/1.0.0']) + const input = fromString('test') + const output = await pipe( + [input], + stream, + async (source) => await first(source) + ) + expect(toString(output!.subarray())).to.equals('test') + console.log('read data', output!.subarray()) + }) +}) + +export {}