Skip to content

node-av / lib / SyncQueue

Class: SyncQueue

Defined in: src/lib/sync-queue.ts:64

Sync Queue for packet synchronization

FFmpeg's native sync_queue from fftools that synchronizes packets from multiple streams before muxing. Prevents streams from getting ahead of each other and ensures proper interleaving for containers with strict timing requirements.

This is the same synchronization mechanism used by FFmpeg CLI (fftools).

Direct mapping to FFmpeg's SyncQueue from fftools/sync_queue.h.

Example

typescript
import { SyncQueue, SyncQueueType } from 'node-av';

// Create sync queue with 1 second buffer
const sq = SyncQueue.create(SyncQueueType.PACKETS, 1000000);

// Add streams
const videoIdx = sq.addStream(1);  // 1 = limiting stream
const audioIdx = sq.addStream(1);

// Send packets to queue
sq.send(videoIdx, videoPacket);
sq.send(audioIdx, audioPacket);

// Receive synchronized packets
const result = sq.receive(-1);  // -1 = any stream
if (result.error === 0) {
  console.log(`Stream ${result.streamIdx}: ${result.packet}`);
  // Write packet to muxer...
}

// Cleanup
sq.free();

See

sync_queue.h - FFmpeg source

Implements

Properties

native

native: NativeSyncQueue

Defined in: src/lib/sync-queue.ts:66

Internal

Methods

[dispose]()

[dispose](): void

Defined in: src/lib/sync-queue.ts:206

Returns

void

Implementation of

Disposable.[dispose]


addStream()

addStream(limiting): number

Defined in: src/lib/sync-queue.ts:107

Add a stream to the sync queue.

Parameters

limiting

number = 1

Whether this stream should limit other streams (1 = yes, 0 = no). Limiting streams control the head position - other streams cannot get ahead.

Returns

number

Stream index in the sync queue

Example

typescript
const videoIdx = sq.addStream(1);  // Video limits timing
const audioIdx = sq.addStream(1);  // Audio limits timing
const subtitleIdx = sq.addStream(0);  // Subtitles don't limit

free()

free(): void

Defined in: src/lib/sync-queue.ts:191

Free the sync queue and all buffered packets. After calling this, the queue cannot be used anymore.

Returns

void

Example

typescript
sq.free();

getNative()

getNative(): NativeSyncQueue

Defined in: src/lib/sync-queue.ts:202

Internal

Get the underlying native SyncQueue object.

Returns

NativeSyncQueue

The native SyncQueue binding object

Implementation of

NativeWrapper.getNative


receive()

receive(streamIdx, packet): number

Defined in: src/lib/sync-queue.ts:178

Receive a packet from the sync queue.

This will receive the next packet that should be written to maintain proper synchronization between streams. The packet parameter is filled with the received data.

Parameters

streamIdx

number

Stream index to receive from, or -1 for any stream

packet

Packet

Packet to fill with received data (output parameter)

Returns

number

Stream index (>= 0) on success, or negative error code: - >= 0: Stream index that the packet belongs to - AVERROR(EAGAIN): No packets ready yet, more frames needed - AVERROR_EOF: All streams finished

Example

typescript
import { AVERROR } from 'node-av';

// Receive from any stream (FFmpeg mux pattern)
const packet = new Packet();
while (true) {
  const ret = sq.receive(-1, packet);
  if (ret === AVERROR('EAGAIN')) {
    break;  // No packets ready
  }
  if (ret === AVERROR_EOF) {
    break;  // All streams finished
  }
  if (ret >= 0) {
    // ret is the stream index
    await muxer.interleavedWriteFrame(packet);
  }
}

send()

send(streamIdx, packet): number

Defined in: src/lib/sync-queue.ts:137

Send a packet to the sync queue.

The packet is cloned internally, so the original can be reused/freed.

To signal EOF for a stream, pass null as the packet. This tells the sync queue that no more packets will be sent for this stream.

Parameters

streamIdx

number

Stream index returned from addStream()

packet

Packet to send, or null to signal EOF

Packet | null

Returns

number

0 on success, AVERROR_EOF if EOF, negative on error

Example

typescript
// Send normal packet
const ret = sq.send(videoIdx, packet);
if (ret === AVERROR_EOF) {
  console.log('Stream finished');
}

// Signal EOF for stream
sq.send(videoIdx, null);

create()

static create(type, bufferSizeUs): SyncQueue

Defined in: src/lib/sync-queue.ts:87

Create a new sync queue.

Parameters

type

SyncQueueType = SyncQueueType.PACKETS

Queue type (PACKETS or FRAMES)

bufferSizeUs

number = 100000

Buffer size in microseconds (default: 100ms)

Returns

SyncQueue

New SyncQueue instance

Example

typescript
// 500ms buffer for RTSP streams
const sq = SyncQueue.create(SyncQueueType.PACKETS, 500000);