I see, thank you so much. The RTP forwarder is exactly what I want. I’m using the videoroom plugin shown below. I tried to use the following python code to decode the audio/video from all participants. However, I cannot get it work: only one of the two decoded wav audios is playable and no sound in it; the video is not even decodable. If you can share how your plugin config and rtp decode code, that would be very helpful. If that’s not appropriate, I totally understand. Could you share the best example you think would help me out, like previous discussions. Do I need to write a separate rtp forwarder section in my janus.plugin.videoroom.jcfg? Thanks.
room-667788: {
description = "example Room";
secret = "adminpwd";
publishers = "6";
bitrate = "512000";
fir_freq = "10";
audiocodec = "PCMU";
videocodec = "h264";
record = "true";
rec_dir = "streaming_analytics_server/janus_recordings";
rtp_forward = true;
};
import requests
import socket
import threading
import subprocess
import time
import os
import uuid
# --------------------- CONFIG ---------------------
JANUS_URL = "http://localhost:8088/janus"
ROOM_ID = 667788
JANUS_SECRET = "adminpwd"
HOST_IP = "192.168.1.34" # IP Janus forwards to (your machine)
BASE_PORT = 65004 # Starting RTP port (must be even)
RECV_DURATION = 5 # Seconds to receive and record streams
# ---------------------------------------------------
session_id = None
handle_id = None
def janus_request(endpoint, payload):
try:
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"[ERROR] Janus request failed: {e}")
return {}
def generate_transaction():
return str(uuid.uuid4())
def create_session():
global session_id
print("[INFO] Creating session...")
res = janus_request(JANUS_URL, {"janus": "create", "transaction": generate_transaction()})
if "data" in res and "id" in res["data"]:
session_id = res["data"]["id"]
print(f"[INFO] Created session: {session_id}")
return session_id
raise RuntimeError("Failed to create session")
def attach_plugin():
global handle_id
print("[INFO] Attaching plugin...")
endpoint = f"{JANUS_URL}/{session_id}"
payload = {
"janus": "attach",
"plugin": "janus.plugin.videoroom",
"transaction": generate_transaction()
}
res = janus_request(endpoint, payload)
if "data" in res and "id" in res["data"]:
handle_id = res["data"]["id"]
print(f"[INFO] Attached to plugin, handle: {handle_id}")
return handle_id
raise RuntimeError("Failed to attach plugin")
def list_participants():
print("[INFO] Listing participants...")
endpoint = f"{JANUS_URL}/{session_id}/{handle_id}"
payload = {
"janus": "message",
"body": {
"request": "listparticipants",
"room": ROOM_ID
},
"transaction": generate_transaction()
}
res = janus_request(endpoint, payload)
try:
participants = res["plugindata"]["data"]["participants"]
return [p["id"] for p in participants]
except KeyError:
print("[WARNING] No participants found.")
return []
def rtp_forward(publisher_id, audio_port, video_port):
print(f"[INFO] Setting up RTP forward for publisher {publisher_id}")
endpoint = f"{JANUS_URL}/{session_id}/{handle_id}"
payload = {
"janus": "message",
"body": {
"request": "rtp_forward",
"room": ROOM_ID,
"publisher_id": publisher_id,
"host": HOST_IP,
"audio_port": audio_port,
"video_port": video_port,
"secret": JANUS_SECRET
},
"transaction": generate_transaction()
}
res = janus_request(endpoint, payload)
if res.get("janus") != "ack":
print(f"[ERROR] RTP forward failed for publisher {publisher_id}: {res}")
return res
def save_rtp_stream(port, filename, duration):
print(f"[INFO] Listening on port {port}, saving to {filename}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.bind(("0.0.0.0", port))
end_time = time.time() + duration
with open(filename, "wb") as f:
while time.time() < end_time:
try:
data, _ = sock.recvfrom(4096)
f.write(data)
except Exception as e:
print(f"[ERROR] Receiving RTP on port {port}: {e}")
break
finally:
sock.close()
print(f"[INFO] Finished receiving on port {port}")
def generate_sdp_file(filename, ip, port, media_type):
sdp_lines = [
"v=0",
"o=- 0 0 IN IP4 127.0.0.1",
"s=RTP Stream",
"t=0 0",
f"c=IN IP4 {ip}"
]
if media_type == "audio":
sdp_lines.append(f"m=audio {port} RTP/AVP 0")
sdp_lines.append("a=rtpmap:0 PCMU/8000")
elif media_type == "video":
sdp_lines.append(f"m=video {port} RTP/AVP 96")
sdp_lines.append("a=rtpmap:96 H264/90000")
sdp_lines.append("a=fmtp:96 packetization-mode=1")
with open(filename, "w") as f:
f.write("\n".join(sdp_lines) + "\n")
def convert_to_media(audio_file, video_file, audio_port, video_port, output_prefix):
audio_sdp = f"{output_prefix}_audio.sdp"
video_sdp = f"{output_prefix}_video.sdp"
audio_out = f"{output_prefix}_audio.wav"
video_out = f"{output_prefix}_video.mp4"
generate_sdp_file(audio_sdp, HOST_IP, audio_port, "audio")
generate_sdp_file(video_sdp, HOST_IP, video_port, "video")
print(f"[INFO] Converting RTP to audio: {audio_out}")
ret1 = subprocess.run([
"ffmpeg", "-protocol_whitelist", "file,udp,rtp", "-i", audio_sdp,
"-acodec", "pcm_s16le", "-ar", "8000", "-ac", "1",
audio_out
])
print(f"[INFO] Converting RTP to video: {video_out}")
ret2 = subprocess.run([
"ffmpeg", "-protocol_whitelist", "file,udp,rtp", "-i", video_sdp,
"-vcodec", "copy",
video_out
])
if ret1.returncode != 0:
print(f"[ERROR] ffmpeg audio conversion failed for {audio_file}")
if ret2.returncode != 0:
print(f"[ERROR] ffmpeg video conversion failed for {video_file}")
# Optionally cleanup
os.remove(audio_sdp)
os.remove(video_sdp)
def main():
try:
create_session()
attach_plugin()
publishers = list_participants()
if not publishers:
print("[INFO] No active publishers found.")
return
threads = []
port = BASE_PORT
stream_info = []
for pub_id in publishers:
audio_port = port
video_port = port + 2
port += 4
rtp_forward(pub_id, audio_port, video_port)
audio_file = f"publisher_{pub_id}_audio.rtp"
video_file = f"publisher_{pub_id}_video.rtp"
output_prefix = f"publisher_{pub_id}"
t1 = threading.Thread(target=save_rtp_stream, args=(audio_port, audio_file, RECV_DURATION))
t2 = threading.Thread(target=save_rtp_stream, args=(video_port, video_file, RECV_DURATION))
threads.extend([t1, t2])
stream_info.append((audio_file, video_file, audio_port, video_port, output_prefix))
t1.start()
t2.start()
for t in threads:
t.join()
for audio_file, video_file, audio_port, video_port, output_prefix in stream_info:
convert_to_media(audio_file, video_file, audio_port, video_port, output_prefix)
except Exception as e:
print(f"[FATAL ERROR] {e}")
if __name__ == "__main__":
main()