What is the best way to do live video/audio analytics while video conferencing?

I’ve been using two mobile Android phones to video conference with each other. The video and audio mjr files are saved on the Janus server. I have to analyze the video/audio before the conference finishes, i.e., live speech transcription and face detection. To do that, I have to first use janus-pp-rec to convert mjr files mp4/wav files, and then run the analytics part. As discussed in this previous question, I cannot use the mjr files while recording. Here are my current naive options in my mind:

  1. I split the video mjr recordings into chunks, e.g., each chunk is about 10 seconds. but I did not find out how to do that. I guess I’d have to modify the source code (record.c) to enable this trunking capability. Please let me know where could be my focus if this option is the best option.
  2. My live analytics part can take both WAV files and the WebSocket stream as input. I assume I can add a monitor/viewer bot to receive and decode the video conferencing from both phones by setting up a valid streaming plugin configuration file. Do you have a suggestion on how to do that? Sorry, I’m very new to the Janus. If I set up a streaming plugin config file, what do I get on the mounting port? Can I get decoded audio/video instead of raw rtp packets? If I can get decoded waveform audio data or decoded RGB images, that would be exactly what I need.
  3. Based on the current development that enables 2 phone video conferencing, add one more bot client to the conference room on the server side. Would this be the simplest option?

Please let me know if you have additional (easier) suggestions. Any comments would be appreciated.

That’s what RTP forwarders is for, which is a feature that a few plugins have. They’ll forward the RTP packets sent by users to an address you specify, from which you can process them any way you want. It’s what we use for live transcriptions and other things ourselves. Depending on the plugin you’re using, you can refer to their documentation for more details.

1 Like

I see, thank you so much. The RTP forwarder is exactly what I want. I’m using the videoroom plugin shown below. I tried to use the following python code to decode the audio/video from all participants. However, I cannot get it work: only one of the two decoded wav audios is playable and no sound in it; the video is not even decodable. If you can share how your plugin config and rtp decode code, that would be very helpful. If that’s not appropriate, I totally understand. Could you share the best example you think would help me out, like previous discussions. Do I need to write a separate rtp forwarder section in my janus.plugin.videoroom.jcfg? Thanks.

room-667788: {
  description = "example Room";
  secret = "adminpwd";
  publishers = "6";
  bitrate = "512000";
  fir_freq = "10";
  audiocodec = "PCMU";
  videocodec = "h264";
  record = "true";
  rec_dir = "streaming_analytics_server/janus_recordings";
  rtp_forward = true;
};

import requests
import socket
import threading
import subprocess
import time
import os
import uuid

# --------------------- CONFIG ---------------------
JANUS_URL = "http://localhost:8088/janus"
ROOM_ID = 667788
JANUS_SECRET = "adminpwd"
HOST_IP = "192.168.1.34"  # IP Janus forwards to (your machine)
BASE_PORT = 65004         # Starting RTP port (must be even)
RECV_DURATION = 5         # Seconds to receive and record streams
# ---------------------------------------------------

session_id = None
handle_id = None

def janus_request(endpoint, payload):
    try:
        response = requests.post(endpoint, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        print(f"[ERROR] Janus request failed: {e}")
        return {}

def generate_transaction():
    return str(uuid.uuid4())

def create_session():
    global session_id
    print("[INFO] Creating session...")
    res = janus_request(JANUS_URL, {"janus": "create", "transaction": generate_transaction()})
    if "data" in res and "id" in res["data"]:
        session_id = res["data"]["id"]
        print(f"[INFO] Created session: {session_id}")
        return session_id
    raise RuntimeError("Failed to create session")

def attach_plugin():
    global handle_id
    print("[INFO] Attaching plugin...")
    endpoint = f"{JANUS_URL}/{session_id}"
    payload = {
        "janus": "attach",
        "plugin": "janus.plugin.videoroom",
        "transaction": generate_transaction()
    }
    res = janus_request(endpoint, payload)
    if "data" in res and "id" in res["data"]:
        handle_id = res["data"]["id"]
        print(f"[INFO] Attached to plugin, handle: {handle_id}")
        return handle_id
    raise RuntimeError("Failed to attach plugin")

def list_participants():
    print("[INFO] Listing participants...")
    endpoint = f"{JANUS_URL}/{session_id}/{handle_id}"
    payload = {
        "janus": "message",
        "body": {
            "request": "listparticipants",
            "room": ROOM_ID
        },
        "transaction": generate_transaction()
    }
    res = janus_request(endpoint, payload)
    try:
        participants = res["plugindata"]["data"]["participants"]
        return [p["id"] for p in participants]
    except KeyError:
        print("[WARNING] No participants found.")
        return []

def rtp_forward(publisher_id, audio_port, video_port):
    print(f"[INFO] Setting up RTP forward for publisher {publisher_id}")
    endpoint = f"{JANUS_URL}/{session_id}/{handle_id}"
    payload = {
        "janus": "message",
        "body": {
            "request": "rtp_forward",
            "room": ROOM_ID,
            "publisher_id": publisher_id,
            "host": HOST_IP,
            "audio_port": audio_port,
            "video_port": video_port,
            "secret": JANUS_SECRET
        },
        "transaction": generate_transaction()
    }
    res = janus_request(endpoint, payload)
    if res.get("janus") != "ack":
        print(f"[ERROR] RTP forward failed for publisher {publisher_id}: {res}")
    return res

def save_rtp_stream(port, filename, duration):
    print(f"[INFO] Listening on port {port}, saving to {filename}")
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        sock.bind(("0.0.0.0", port))
        end_time = time.time() + duration
        with open(filename, "wb") as f:
            while time.time() < end_time:
                try:
                    data, _ = sock.recvfrom(4096)
                    f.write(data)
                except Exception as e:
                    print(f"[ERROR] Receiving RTP on port {port}: {e}")
                    break
    finally:
        sock.close()
        print(f"[INFO] Finished receiving on port {port}")

def generate_sdp_file(filename, ip, port, media_type):
    sdp_lines = [
        "v=0",
        "o=- 0 0 IN IP4 127.0.0.1",
        "s=RTP Stream",
        "t=0 0",
        f"c=IN IP4 {ip}"
    ]

    if media_type == "audio":
        sdp_lines.append(f"m=audio {port} RTP/AVP 0")
        sdp_lines.append("a=rtpmap:0 PCMU/8000")
    elif media_type == "video":
        sdp_lines.append(f"m=video {port} RTP/AVP 96")
        sdp_lines.append("a=rtpmap:96 H264/90000")
        sdp_lines.append("a=fmtp:96 packetization-mode=1")

    with open(filename, "w") as f:
        f.write("\n".join(sdp_lines) + "\n")

def convert_to_media(audio_file, video_file, audio_port, video_port, output_prefix):
    audio_sdp = f"{output_prefix}_audio.sdp"
    video_sdp = f"{output_prefix}_video.sdp"
    audio_out = f"{output_prefix}_audio.wav"
    video_out = f"{output_prefix}_video.mp4"

    generate_sdp_file(audio_sdp, HOST_IP, audio_port, "audio")
    generate_sdp_file(video_sdp, HOST_IP, video_port, "video")

    print(f"[INFO] Converting RTP to audio: {audio_out}")
    ret1 = subprocess.run([
        "ffmpeg", "-protocol_whitelist", "file,udp,rtp", "-i", audio_sdp,
        "-acodec", "pcm_s16le", "-ar", "8000", "-ac", "1",
        audio_out
    ])

    print(f"[INFO] Converting RTP to video: {video_out}")
    ret2 = subprocess.run([
        "ffmpeg", "-protocol_whitelist", "file,udp,rtp", "-i", video_sdp,
        "-vcodec", "copy",
        video_out
    ])

    if ret1.returncode != 0:
        print(f"[ERROR] ffmpeg audio conversion failed for {audio_file}")
    if ret2.returncode != 0:
        print(f"[ERROR] ffmpeg video conversion failed for {video_file}")

    # Optionally cleanup
    os.remove(audio_sdp)
    os.remove(video_sdp)

def main():
    try:
        create_session()
        attach_plugin()
        publishers = list_participants()

        if not publishers:
            print("[INFO] No active publishers found.")
            return

        threads = []
        port = BASE_PORT
        stream_info = []

        for pub_id in publishers:
            audio_port = port
            video_port = port + 2
            port += 4

            rtp_forward(pub_id, audio_port, video_port)

            audio_file = f"publisher_{pub_id}_audio.rtp"
            video_file = f"publisher_{pub_id}_video.rtp"
            output_prefix = f"publisher_{pub_id}"

            t1 = threading.Thread(target=save_rtp_stream, args=(audio_port, audio_file, RECV_DURATION))
            t2 = threading.Thread(target=save_rtp_stream, args=(video_port, video_file, RECV_DURATION))
            threads.extend([t1, t2])
            stream_info.append((audio_file, video_file, audio_port, video_port, output_prefix))

            t1.start()
            t2.start()

        for t in threads:
            t.join()

        for audio_file, video_file, audio_port, video_port, output_prefix in stream_info:
            convert_to_media(audio_file, video_file, audio_port, video_port, output_prefix)

    except Exception as e:
        print(f"[FATAL ERROR] {e}")

if __name__ == "__main__":
    main()

For your reference, below is the error I got by running the Python code above to decode from the RTP forwarder stream. Please let me know if you have any thoughts. Thanks.

I don’t have examples I can share. As long as your recipient understands RTP and can extract/process frames, it will work. This tutorial I made at ClueCon a few years ago may make it easier to understand.

1 Like

Thank you so much, @lorenzo . I successfully used python to implement the RTP forwarding and decoded the forwarded video/audio streams into MP4/wav files. I will open source how I do it and share the repo here later.

Now, I have another question regarding the live analytics part, e.g., live transcription as you did. What’s the best way to stream/send the live transcription back to publishers and subscribers?

Still, I’m using the video room plugin. There are multiple participants in the room. When the server uses some AI models to transcribe the published streaming audio, I want all participants to see the live transcriptions before a publisher finishes talking. This is a unidirectional communication from the server to all participants. Currently, I’m thinking about the following two options:

  1. video room + text room: Create a separate text room. All participants in the current video room will also have to be the subscribers in the text room. This way, the server can create a text bot to send the live transcriptions in the text room.

  2. video room + data channels: All participants in the current video room, when joining the video room, will also have to open the DataChannel.

Please let me know which of the two options makes more sense to you, or if you have a different thought. Thank you so much again.