Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions examples/data_tracks/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import logging
import asyncio
from signal import SIGINT, SIGTERM
from livekit import rtc

# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


async def read_sensor() -> bytes:
# Dynamically read some sensor data...
return bytes([0xFA] * 256)


async def push_frames(track: rtc.LocalDataTrack):
while True:
logging.info("Pushing frame")
data = await read_sensor()
try:
frame = rtc.DataTrackFrame(payload=data).with_user_timestamp_now()
track.try_push(frame)
except rtc.PushFrameError as e:
logging.error("Failed to push frame: %s", e)
await asyncio.sleep(0.5)


async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)

track = await room.local_participant.publish_data_track("my_sensor_data")
await push_frames(track)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("publisher.log"),
logging.StreamHandler(),
],
)

loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)

main_task = asyncio.ensure_future(main(room))

async def cleanup():
main_task.cancel()
try:
await main_task
except asyncio.CancelledError:
pass
await room.disconnect()
loop.stop()

for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))

try:
loop.run_forever()
finally:
loop.close()
66 changes: 66 additions & 0 deletions examples/data_tracks/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import logging
import asyncio
from signal import SIGINT, SIGTERM
from livekit import rtc

# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


async def subscribe(track: rtc.RemoteDataTrack):
logging.info(
"Subscribing to '%s' published by '%s'",
track.info.name,
track.publisher_identity,
)
subscription = await track.subscribe()
async for frame in subscription:
logging.info("Received frame (%d bytes)", len(frame.payload))

latency = frame.duration_since_timestamp()
if latency is not None:
logging.info("Latency: %.3f s", latency)


async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

active_tasks = []

@room.on("remote_data_track_published")
def on_remote_data_track_published(track: rtc.RemoteDataTrack):
task = asyncio.create_task(subscribe(track))
active_tasks.append(task)
task.add_done_callback(lambda _: active_tasks.remove(task))

await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("subscriber.log"),
logging.StreamHandler(),
],
)

loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)

async def cleanup():
await room.disconnect()
loop.stop()

asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))

try:
loop.run_forever()
finally:
loop.close()
78 changes: 36 additions & 42 deletions livekit-protocol/livekit/protocol/agent.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading