Logo Search packages:      
Sourcecode: helix-player version File versions

rtptran.cpp

/* ***** BEGIN LICENSE BLOCK *****
 * Source last modified: $Id: rtptran.cpp,v 1.54.2.2 2005/01/03 19:00:03 rishimathew Exp $
 * 
 * Portions Copyright (c) 1995-2004 RealNetworks, Inc. All Rights Reserved.
 * 
 * The contents of this file, and the files included with this file,
 * are subject to the current version of the RealNetworks Public
 * Source License (the "RPSL") available at
 * http://www.helixcommunity.org/content/rpsl unless you have licensed
 * the file under the current version of the RealNetworks Community
 * Source License (the "RCSL") available at
 * http://www.helixcommunity.org/content/rcsl, in which case the RCSL
 * will apply. You may also obtain the license terms directly from
 * RealNetworks.  You may not use this file except in compliance with
 * the RPSL or, if you have a valid RCSL with RealNetworks applicable
 * to this file, the RCSL.  Please see the applicable RPSL or RCSL for
 * the rights, obligations and limitations governing use of the
 * contents of the file.
 * 
 * Alternatively, the contents of this file may be used under the
 * terms of the GNU General Public License Version 2 or later (the
 * "GPL") in which case the provisions of the GPL are applicable
 * instead of those above. If you wish to allow use of your version of
 * this file only under the terms of the GPL, and not to allow others
 * to use your version of this file under the terms of either the RPSL
 * or RCSL, indicate your decision by deleting the provisions above
 * and replace them with the notice and other provisions required by
 * the GPL. If you do not delete the provisions above, a recipient may
 * use your version of this file under the terms of any one of the
 * RPSL, the RCSL or the GPL.
 * 
 * This file is part of the Helix DNA Technology. RealNetworks is the
 * developer of the Original Code and owns the copyrights in the
 * portions it created.
 * 
 * This file, and the files included with this file, is distributed
 * and made available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY
 * KIND, EITHER EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS
 * ALL SUCH WARRANTIES, INCLUDING WITHOUT LIMITATION, ANY WARRANTIES
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, QUIET
 * ENJOYMENT OR NON-INFRINGEMENT.
 * 
 * Technology Compatibility Kit Test Suite(s) Location:
 *    http://www.helixcommunity.org/content/tck
 * 
 * Contributor(s):
 * 
 * ***** END LICENSE BLOCK ***** */

/****************************************************************************
 *  Defines
 */
#define ACCEPTABLE_SYNC_NOISE 3
#define STREAM_END_DELAY_RTP_TOLERANCE    500

/****************************************************************************
 *  Includes
 */
#include "hxtypes.h"
#include "hxassert.h"
#include "debug.h"
#include "hxcom.h"
#include "hxmarsh.h"
#include "hxstrutl.h"
#include "netbyte.h"
#include "hxengin.h"
#include "ihxpckts.h"
#include "hxsbuffer.h"
#include "hxcomm.h"
#include "hxmon.h"
#include "netbyte.h"
#include "hxstring.h"
#include "chxpckts.h"
#include "hxslist.h"
#include "hxmap.h"
#include "hxdeque.h"
#include "hxbitset.h"
#include "timebuff.h"
#include "timeval.h"
#include "tconverter.h"
#include "rtptypes.h"
#include "hxqosinfo.h"
#include "hxqossig.h"
#include "hxqos.h"
//#include "hxcorgui.h"

#include "ntptime.h"

#include "rtspif.h"
#include "rtsptran.h"
#include "rtptran.h"
#include "rtpwrap.h"    // Wrappers for PMC generated base classes
#include "basepkt.h"
#include "hxtbuf.h"
#include "transbuf.h"
#include "hxtick.h"
#include "random32.h"   // random32()
#include "pkthndlr.h"   // in rtpmisc for RTCP routine
#include "rtcputil.h"   // takes care of RTCP in RTP mode
#include "rtspmsg.h"
#include "hxprefs.h"    // IHXPreferences
#include "hxmime.h"
#include "hxcore.h"

#include "hxheap.h"
#ifdef PAULM_IHXTCPSCAR
#include "objdbg.h"
#endif

#ifdef _DEBUG
#undef HX_THIS_FILE           
static const char HX_THIS_FILE[] = __FILE__;
#endif

#include "bufnum.h"

#define MAX_STARTINFO_WAIT_TIME           20000       // in milliseconds
#define MIN_NUM_PACKETS_SCANNED_FOR_LIVE_START    5
#define MAX_NUM_PACKETS_GAPPED_FOR_LIVE_START     1

static const UINT32 NORMAL_ACK_INTERVAL = 1000;        // 1/sec
static const UINT32 MINIMUM_ACK_INTERVAL = 200;        // wait 200msecs

static const UINT32 NORMAL_REPORT_INTERVAL = 5000;    // 1 per 5secs

static const UINT32 TRANSPORT_BUF_GROWTH_RATE  = 1000;
static const UINT32 LATENCY_REPORT_INTERVAL_MS = 1000;

static const UINT32 RTP_NAT_TIMEOUT = 15000; // Default timeout period


static UINT32 GetNATTimeout(IUnknown* pContext)
{
    UINT32 ret = RTP_NAT_TIMEOUT;

    IHXPreferences* pPrefs = NULL;
    
    if (pContext &&
      (HXR_OK == pContext->QueryInterface(IID_IHXPreferences, 
                                  (void**)&pPrefs)))
    {
      IHXBuffer* pPrefBuf = NULL;
      
      if ((HXR_OK == pPrefs->ReadPref("UDPNATTimeout", pPrefBuf)) &&
          pPrefBuf)
      {
          int tmp = atoi((const char*)pPrefBuf->GetBuffer());

          if (tmp >= 0)
          {
            ret = (UINT32)tmp;
          }

          HX_RELEASE(pPrefBuf);
      }
        HX_RELEASE(pPrefs);
    }

    return ret;
}

/******************************************************************************
*   RTP RTP RTP RTP RTP 
******************************************************************************/

RTPBaseTransport::RTPBaseTransport(BOOL bIsSource)
    : RTSPTransport(bIsSource)
    , m_lRefCount(0)    
    , m_pBwMgrInput(0)    
    , m_pSyncServer(NULL)
    , m_streamNumber(0)
    , m_uFirstSeqNum(0)
    , m_ulFirstRTPTS(0)
    , m_bFirstSet(FALSE)
    , m_bWeakStartSync(FALSE)
    , m_lTimeOffsetHX(0)
    , m_lTimeOffsetRTP(0)
    , m_lOffsetToMasterHX(0)
    , m_lOffsetToMasterRTP(0)
    , m_lSyncOffsetHX(0)
    , m_lSyncOffsetRTP(0)
    , m_lNTPtoHXOffset(0)
    , m_bNTPtoHXOffsetSet(FALSE)
    , m_ulLastRTPTS(0)
    , m_ulLastHXTS(0)
    , m_ulLastRawRTPTS(0)
    , m_bLastTSSet(FALSE)
    , m_pRTCPTran(0)
    , m_pReportHandler(0)
    , m_ulBaseTS(0)
    , m_bHasMarkerRule(FALSE)
    , m_bHasRTCPRule(FALSE)
    , m_ulPayloadWirePacket(0)
    , m_bIsLive(FALSE)
    , m_ulExtensionSupport(0)
    , m_bSeqNoSet(FALSE)
    , m_bRTPTimeSet(FALSE)
    , m_bActive(TRUE)
    , m_pFirstPlayTime(NULL)
    , m_bWaitForStartInfo(TRUE)
    , m_bAbortWaitForStartInfo(FALSE)
    , m_bEmulatePVSession(FALSE)
    , m_pMBitHandler(NULL)
    , m_pQoSInfo(NULL)
    , m_bSSRCDetermined(FALSE)
    , m_ulSSRCDetermined(0)
    , m_cLSRRead(0)
    , m_cLSRWrite(0)
#ifdef RTP_MESSAGE_DEBUG
    , m_bMessageDebug(FALSE)
#endif      // RTP_MESSAGE_DEBUG
{
    m_wrapSequenceNumber = DEFAULT_WRAP_SEQ_NO;
}

RTPBaseTransport::~RTPBaseTransport()
{
    resetStartInfoWaitQueue();
}

STDMETHODIMP
RTPBaseTransport::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXSourceBandwidthInfo))
    {
        AddRef();
        *ppvObj = (IHXSourceBandwidthInfo*)this;
        return HXR_OK;
    }

    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP_(UINT32)
RTPBaseTransport::AddRef()
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
RTPBaseTransport::Release()
{
    if(InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    delete this;
    return 0;
}

void
RTPBaseTransport::Done()
{
    HX_RELEASE(m_pQoSInfo);
    HX_RELEASE(m_pBwMgrInput);
    HX_RELEASE(m_pRTCPTran);
    HX_RELEASE(m_pPacketFilter);
    HX_RELEASE(m_pSyncServer);
    HX_DELETE(m_pFirstPlayTime);
}

HX_RESULT
RTPBaseTransport::init()
{
    // m_pReportHandler will be freed in RTCPBaseTransport::Done()...
    HX_ASSERT(!m_pReportHandler);
    m_pReportHandler =   
      new ReportHandler(m_bIsSource, !m_bIsSource, random32(HX_GET_TICKCOUNT()));    
    HX_ASSERT(m_pReportHandler);
    if(!m_pReportHandler)
    {
        return HXR_OUTOFMEMORY;
    }
    
#ifdef RTP_MESSAGE_DEBUG
    IHXPreferences* pPreferences = NULL;

    if (m_pContext &&
      (HXR_OK == m_pContext->QueryInterface(IID_IHXPreferences,
                                   (void**) &pPreferences)))
    {
      IHXBuffer* pBuffer = NULL;

      if (HXR_OK == pPreferences->ReadPref("RTPMessageDebug", pBuffer))
      {
          m_bMessageDebug = atoi((const char*)pBuffer->GetBuffer()) ? TRUE : FALSE;
          HX_RELEASE(pBuffer);
          if (m_bMessageDebug)
          {
            if (HXR_OK == pPreferences->ReadPref("RTPMessageDebugFile", pBuffer))
            {
                if (pBuffer->GetSize() <= 0)
                {
                  // no file name, no log
                  m_bMessageDebug = FALSE;
                }
                else
                {
                  m_messageDebugFileName = (const char*) pBuffer->GetBuffer();
                }             
            }
                HX_RELEASE(pBuffer);
          }
      }
    }

    HX_RELEASE(pPreferences);
#endif      // RTP_MESSAGE_DEBUG
    return HXR_OK;
}

void
RTPBaseTransport::addStreamInfo(RTSPStreamInfo* pStreamInfo, UINT32 ulBufferDepth)
{   
    RTSPTransport::addStreamInfo(pStreamInfo, ulBufferDepth);

    // there better be only one stream
    m_streamNumber = pStreamInfo->m_streamNumber;

    // if pStreamInfo->m_rtpPayloadType is -1, it hasn't been set
    // by user, so just assign RTP_PAYLOAD_RTSP
    if (pStreamInfo->m_rtpPayloadType < 0)
    {
      m_rtpPayloadType = RTP_PAYLOAD_RTSP;
    }
    else
    {
      m_rtpPayloadType = (UINT8)pStreamInfo->m_rtpPayloadType;
    } 

    if (pStreamInfo)
    {
      if (pStreamInfo->m_bHasMarkerRule)
      {
          m_bHasMarkerRule = pStreamInfo->m_bHasMarkerRule;
          m_markerRuleNumber = pStreamInfo->m_markerRule;
          // better be odd.
          HX_ASSERT(m_markerRuleNumber & 0x1);
      }
      
      m_bIsLive = pStreamInfo->m_bIsLive;
      m_ulExtensionSupport = pStreamInfo->m_bExtensionSupport;
      m_bActive = pStreamInfo->m_bActive;
      m_bIsSyncMaster = pStreamInfo->m_bIsSyncMaster;

      // RTP transport always creates RTP packets on reception
      if (!m_bIsSource)
      {
          RTSPStreamData* pStreamData = NULL;

          pStreamData = m_pStreamHandler->getStreamData(pStreamInfo->m_streamNumber);

          HX_ASSERT(pStreamData);

          if (pStreamData)
          {
            pStreamData->m_bUsesRTPPackets = TRUE;
          }

          if (pStreamData->m_pTSConverter)
          {
            m_pRTCPTran->SetTSConverter(
                pStreamData->m_pTSConverter->GetConversionFactors());
          }
      }
      
      /*
       *  Reflection support
       */
      m_bHasRTCPRule = pStreamInfo->m_bHasRTCPRule;
      if (m_bHasRTCPRule)
      {
          m_RTCPRuleNumber = pStreamInfo->m_RTCPRule;
      }
      m_ulPayloadWirePacket = pStreamInfo->m_ulPayloadWirePacket;

      if (m_pRTCPTran)
      {
          m_pRTCPTran->addStreamInfo(pStreamInfo, ulBufferDepth);
      }
    }
}

/*
*  We need to set an initial SeqNo & timestamp for RTP
*/

HX_RESULT
RTPBaseTransport::setFirstSeqNum(UINT16 uStreamNumber, UINT16 uSeqNum)
{
    HX_RESULT theErr = HXR_UNEXPECTED;

    // On client we allow setting of sequence number only once not to cause
    // havoc in transport buffer
    if (m_bIsSource || (!m_bSeqNoSet))
    {
      theErr = RTSPTransport::setFirstSeqNum(uStreamNumber, uSeqNum);

#ifdef RTP_MESSAGE_DEBUG
      messageFormatDebugFileOut("INIT: StartSeqNum=%u", 
                          uSeqNum);
#endif      // RTP_MESSAGE_DEBUG

      if (SUCCEEDED(theErr))
      {
          m_bSeqNoSet = TRUE;
      }
    }
    
    return theErr;
}

void
RTPBaseTransport::setFirstTimeStamp(UINT16 uStreamNumber, UINT32 ulTS, 
                                    BOOL bIsRaw)
{
    RTSPStreamData* pStreamData = 
      m_pStreamHandler->getStreamData(uStreamNumber);

    if (pStreamData)
    {
      if (m_bIsSource)
      {           
          /* ulFrom is what we want to put in RTP-Info: rtptimestamp */
          if (pStreamData->m_pTSConverter && !bIsRaw)
          {
            pStreamData->m_lastTimestamp = pStreamData->m_pTSConverter->hxa2rtp(ulTS);
          }
          else
          {
            pStreamData->m_lastTimestamp = ulTS;
          }
      }
      else if (!m_bRTPTimeSet)
      {
          // ulTS is what's reported in rtptime of RTP-Info PLAY response 
          // header in RTP time.  Unit is RTP.
          /*
           *      HXTimeval = PktTime*Factor - (ulTS*Factor - m_ulPlayRangeFrom)
           *  RTPTimeval = PktTime - (ulTS - m_ulPlayRangeFrom / Factor)
           */         
            if (m_ulPlayRangeFrom != RTSP_PLAY_RANGE_BLANK)
            {
              if (pStreamData->m_pTSConverter)
              {
                m_lTimeOffsetRTP = ulTS -
                               pStreamData->
                                  m_pTSConverter->hxa2rtp_raw(m_ulPlayRangeFrom);
                pStreamData->m_pTSConverter->setAnchor(m_ulPlayRangeFrom, ulTS);
                m_lTimeOffsetHX = 0;
              }
              else
              {
                m_lTimeOffsetHX = m_lTimeOffsetRTP = ulTS - m_ulPlayRangeFrom;            
              }
            }

          if ((m_ulPlayRangeFrom != RTSP_PLAY_RANGE_BLANK) &&
            (m_ulPlayRangeTo != RTSP_PLAY_RANGE_BLANK))
          {
            pStreamData->m_pTransportBuffer->InformTimestampRange(
                m_ulPlayRangeFrom,
                m_ulPlayRangeTo,
                STREAM_END_DELAY_RTP_TOLERANCE);
          }

#ifdef RTP_MESSAGE_DEBUG
          messageFormatDebugFileOut("INIT: RTPOffset=%u HXOffset=%u", 
                              m_lTimeOffsetRTP, 
                              m_lTimeOffsetHX);
#endif      // RTP_MESSAGE_DEBUG

          // Reset the time stamp ordering
          HX_DELETE(pStreamData->m_pTSOrderHack);
      }     

      m_bRTPTimeSet = TRUE;
    }
}

void
RTPBaseTransport::notifyEmptyRTPInfo(void)
{
    // If RTP-Info is empty there is no point in waiting for out-of-band
    // start info (start seq number and time stamp) since this is the
    // only out-of-band method of communicating start info. in RTP.
    m_bAbortWaitForStartInfo = TRUE;
}

void 
RTPBaseTransport::setPlayRange(UINT32 ulFrom, UINT32 ulTo)
{
    // this is the Range values in PLAY request in RMA time (ms) called on PLAY 
    // request
    RTSPTransport::setPlayRange(ulFrom, ulTo);
    
    m_bSeqNoSet = FALSE;
    m_bRTPTimeSet = FALSE;
    m_bWaitForStartInfo = TRUE;
    m_bAbortWaitForStartInfo = FALSE;
    m_uFirstSeqNum = 0;
    m_ulFirstRTPTS = 0;
    m_bFirstSet = FALSE;
    m_bWeakStartSync = FALSE;
    m_lTimeOffsetHX = 0;
    m_lTimeOffsetRTP = 0;
    m_lOffsetToMasterHX = 0;
    m_lOffsetToMasterRTP = 0;
    m_lSyncOffsetHX = 0;
    m_lSyncOffsetRTP = 0;
    m_ulLastRTPTS = 0;
    m_ulLastHXTS = 0;
    m_ulLastRawRTPTS = 0;
    m_bLastTSSet = FALSE;
    m_lNTPtoHXOffset = 0;
    m_bNTPtoHXOffsetSet = FALSE;
    resetStartInfoWaitQueue();

#ifdef RTP_MESSAGE_DEBUG
    messageFormatDebugFileOut("INIT: PlayRange=%u-%u", 
                         ulFrom, ulTo);
#endif      // RTP_MESSAGE_DEBUG
}

HX_RESULT
RTPBaseTransport::setFirstPlayTime(Timeval* pTv)
{
    if (!m_pFirstPlayTime)
    {
      m_pFirstPlayTime = new Timeval();
        if(!m_pFirstPlayTime)
        {
            return HXR_OUTOFMEMORY;
        }
    }
    
    m_pFirstPlayTime->tv_sec = pTv->tv_sec;
    m_pFirstPlayTime->tv_usec = pTv->tv_usec;
    return HXR_OK;
}

HX_RESULT
RTPBaseTransport::reflectPacket(BasePacket* pBasePacket, REF(IHXBuffer*)pSendBuf)
{
    HX_ASSERT(pBasePacket);
    HX_ASSERT(m_bHasRTCPRule);
    HX_ASSERT(m_ulPayloadWirePacket==1);
    
    HX_RESULT theErr = HXR_OK;
    
    IHXPacket* pPacket = pBasePacket->GetPacket();
    IHXBuffer* pBuffer = NULL;
    UINT32  ulLen = 0;
    
    /*
     *      Sanity check 
     */
    if (!pPacket)
    {
      return HXR_UNEXPECTED;
    }
    else if (pPacket->IsLost())
    {
      pPacket->Release();
      return HXR_IGNORE;
    }
    else
    {
      pBuffer = pPacket->GetBuffer();
      if (!pBuffer)
      {
          pPacket->Release();
          return HXR_UNEXPECTED;
      }    
    } 

    ulLen = pBuffer->GetSize();

    HX_ASSERT(pPacket->GetStreamNumber() == m_streamNumber);
    HX_ASSERT(pPacket->GetASMFlags());

    /*
     * RTP packet
     */    
    UINT16 streamNumber = pPacket->GetStreamNumber();    
    RTSPStreamData* pStreamData = 
      m_pStreamHandler->getStreamData(streamNumber);

    if (isRTCPRule(pPacket->GetASMRuleNumber()))
    {
      /*
       *  RTCP packet
       */
      if (!pStreamData->m_bFirstPacket)
      {
          if (m_reflectorInfo.m_unSeqNoOffset  && m_reflectorInfo.m_ulRTPTSOffset)
          {
            theErr = FixRTCPSR(m_pCommonClassFactory,
                           pBuffer, 
                             pSendBuf,
                             m_reflectorInfo.m_ulRTPTSOffset);
          }
          else
          {
            theErr = HXR_OK;
            pSendBuf = pBuffer;
            pSendBuf->AddRef();
          }
      }
      else
      {
          theErr = HXR_IGNORE;
      }

      BYTE* pReport = pBuffer->GetBuffer();
      
      if ((pReport) && ((*(++pReport)) == 200))
      {
          pReport += 7;
          UINT32 ulSourceSec = GetDwordFromBufAndInc(pReport);
          UINT32 ulSourceFract = GetDwordFromBufAndInc(pReport);
          
          HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
          Timeval tvNow((INT32) rmatv.tv_sec, (INT32)rmatv.tv_usec);
          NTPTime ntpNow(tvNow);
          
          m_LSRHistory [m_cLSRWrite].m_ulSourceLSR = ulSourceSec  << 16;
          m_LSRHistory [m_cLSRWrite].m_ulSourceLSR |= (ulSourceFract >> 16); 

          m_LSRHistory [m_cLSRWrite].m_ulServerLSR = ntpNow.m_ulSecond  << 16;
          m_LSRHistory [m_cLSRWrite].m_ulServerLSR |= (ntpNow.m_ulFraction >> 16); 
          (++m_cLSRWrite) %= LSR_HIST_SZ;
      }

      if (HXR_OK == theErr)
      {
          theErr = m_pRTCPTran->reflectRTCP(pSendBuf);
          HX_RELEASE(pSendBuf);
      }
      
      pPacket->Release();
      pBuffer->Release();

      if (HXR_OK == theErr)
      {
          return HXR_IGNORE;
      }
      else
      {
          return theErr;      
      }         
    }


    if (!pStreamData->m_packetSent)
    { 
      pStreamData->m_packetSent = TRUE;
    }
    
    if (pStreamData->m_bFirstPacket)
    {
      pStreamData->m_bFirstPacket = FALSE;

      BYTE* pcOrig = pBuffer->GetBuffer();
      UINT16 unFirstSeqNo = 0;
      UINT32 ulFirstRTPTS = 0;
      
      pcOrig += 2;
      unFirstSeqNo = *pcOrig++<<8; 
      unFirstSeqNo |= *pcOrig++;

        ulFirstRTPTS = GetDwordFromBufAndInc(pcOrig);

      if (m_pReportHandler)
      {
          m_pReportHandler->SetSSRC(GetDwordFromBufAndInc(pcOrig));
      }

        // get an offset for reflector
      UINT16 nA = m_reflectorInfo.m_unSeqNoOffset ;
      UINT16 nB = unFirstSeqNo;
      UINT32 lA = m_reflectorInfo.m_ulRTPTSOffset;
      UINT32 lB = ulFirstRTPTS;
      
      m_reflectorInfo.m_unSeqNoOffset = 0 - unFirstSeqNo;
      m_reflectorInfo.m_ulRTPTSOffset = 0 - ulFirstRTPTS;   
    }

    if (m_reflectorInfo.m_unSeqNoOffset  && m_reflectorInfo.m_ulRTPTSOffset)
    {
      theErr = FixRTPHeader(m_pCommonClassFactory,
                        pBuffer, 
                        pSendBuf,
                        m_reflectorInfo.m_unSeqNoOffset ,
                        m_reflectorInfo.m_ulRTPTSOffset);
    }
    else
    {
      theErr = HXR_OK;
      pSendBuf = pBuffer;
      pSendBuf->AddRef();
    }

    // forever increasing
    pStreamData->m_seqNo = pBasePacket->m_uSequenceNumber;

    pStreamData->m_lastTimestamp = pPacket->GetTime();


    HX_ASSERT(pBuffer);

    BYTE* pRawPkt = (BYTE*)pBuffer->GetBuffer();
    UINT32 ulPayloadLen = ulLen;
    UINT32 ulRTPHeaderSize = 0;

    UINT8 uiCSRCCount = (UINT32)(pRawPkt[0] & 0x0F);

// We only want to count the payload, not the RTP headers.

    ulRTPHeaderSize += (4 * 3); // RTP fixed header size, not including CSRCs.
    ulRTPHeaderSize += 4 * uiCSRCCount; // CSRCs. 

// Extension header present.
    if (pRawPkt[0] & 0x20)
    {
        HX_ASSERT(ulPayloadLen - ulRTPHeaderSize > 0);
        ulRTPHeaderSize += 2; // 16-bit profile-defined field

    // Overrun prevention.
        if (pBuffer->GetSize() > ulRTPHeaderSize + 1)
        {
            // Extension length is last 16 bits of first word.
            UINT32 ulExtensionLength = (pRawPkt[ulRTPHeaderSize] << 8) + pRawPkt[ulRTPHeaderSize + 1];
            ulRTPHeaderSize += 2; // 16-bit length field.
            ulRTPHeaderSize += (ulExtensionLength * 4); // Rest of extension header.
        }
    }
    

    ulPayloadLen -= ulRTPHeaderSize;

    updateQoSInfo(ulPayloadLen);

    /*
     *      clean up
     */
    pPacket->Release();
    pBuffer->Release();
    return theErr;
}

void
RTPBaseTransport::updateQoSInfo(UINT32 ulBytesSent)
{
    m_ulPacketsSent++;    
    m_lBytesSent += ulBytesSent;

    if (!m_pQoSInfo)
    {
        return;
    }

    UINT64 ulSessionBytesSent = m_pQoSInfo->GetBytesSent();
    ulSessionBytesSent += ulBytesSent;
    m_pQoSInfo->SetBytesSent(ulSessionBytesSent);

    UINT32 ulSessionPacketsSent = m_pQoSInfo->GetPacketsSent();
    ulSessionPacketsSent++;
    m_pQoSInfo->SetPacketsSent(ulSessionPacketsSent);
}

UINT32
RTPBaseTransport::MapLSR(UINT32 ulSourceLSR)
{
    if (m_ulPayloadWirePacket == 0)
    {
      return ulSourceLSR;
    }

    UINT8  cSearchCursor = m_cLSRRead;

    while (cSearchCursor != m_cLSRWrite)
    {
      if (m_LSRHistory [cSearchCursor].m_ulSourceLSR == ulSourceLSR)
      {
          m_cLSRRead = cSearchCursor;
          return m_LSRHistory [cSearchCursor].m_ulServerLSR;
      }

      (++cSearchCursor) %= LSR_HIST_SZ;
    }

    return 0;
}

HX_RESULT
FixRTPHeader(IHXCommonClassFactory* pCCF, 
           IHXBuffer* pOrigBuf, 
           REF(IHXBuffer*) pNewBuf, 
           UINT16 unSeqNoOffset, 
           UINT32 ulRTPTSOffset)
{          
    if (pOrigBuf->GetSize() < 8)
    {
      return HXR_INVALID_PARAMETER; 
    }
    
    HX_RESULT theErr = pCCF->CreateInstance(IID_IHXBuffer, (void**) &pNewBuf);
    if (HXR_OK == theErr)
    {
      theErr = pNewBuf->Set(pOrigBuf->GetBuffer(), pOrigBuf->GetSize());
    }
    if (HXR_OK == theErr)
    {
      BYTE* pcOrig = pOrigBuf->GetBuffer();
      UINT16 unSeqNo = 0;
      UINT32 ulRTPTS = 0;
      
      pcOrig += 2;
      unSeqNo = *pcOrig++<<8; 
      unSeqNo |= *pcOrig++;

      ulRTPTS = GetDwordFromBufAndInc(pcOrig);
        
        UINT16 nA = unSeqNo;
      UINT32 lA = ulRTPTS;
      
      // update
      unSeqNo += unSeqNoOffset;
      ulRTPTS += ulRTPTSOffset;           
      
      BYTE* pc = pNewBuf->GetBuffer();
      pc += 2;    
      *pc++ = (UINT8)(unSeqNo>>8); 
      *pc++ = (UINT8)(unSeqNo);
      *pc++ = (UINT8)(ulRTPTS>>24); 
      *pc++ = (UINT8)(ulRTPTS>>16); 
      *pc++ = (UINT8)(ulRTPTS>>8); 
      *pc++ = (UINT8)(ulRTPTS);
    }
    return theErr;
}

HX_RESULT
FixRTCPSR(IHXCommonClassFactory* pCCF, 
        IHXBuffer* pOrigBuf, 
        REF(IHXBuffer*) pNewBuf, 
        UINT32 ulRTPTSOffset)
{
    BYTE* pcOrig = pOrigBuf->GetBuffer();
    if (pOrigBuf->GetSize() < 20)
    {
      return HXR_INVALID_PARAMETER; 
    } 
    else
    {
      // make sure it's SR
      if (RTCP_SR != *(pcOrig+1))
      {
          return HXR_IGNORE;        
      }
    }
    
    HX_RESULT theErr = pCCF->CreateInstance(IID_IHXBuffer, (void**) &pNewBuf);
    if (HXR_OK == theErr)
    {
      theErr = pNewBuf->Set(pOrigBuf->GetBuffer(), pOrigBuf->GetSize());
    }
    if (HXR_OK == theErr)
    {
      UINT32 ulRTPTS = 0;

      pcOrig += 16;         
        ulRTPTS = GetDwordFromBufAndInc(pcOrig);

        UINT32 lA = ulRTPTS;
      // update
      ulRTPTS += ulRTPTSOffset;
 
      BYTE* pc = pNewBuf->GetBuffer();
      pc += 16;

      //RTP Timestamp
      *pc++ = (UINT8)(ulRTPTS>>24); 
      *pc++ = (UINT8)(ulRTPTS>>16); 
      *pc++ = (UINT8)(ulRTPTS>>8); 
      *pc++ = (UINT8)(ulRTPTS);
    }
    return theErr;    
}


void
RTPBaseTransport::SyncTimestamp(IHXPacket* pPacket)
{

    IHXTimeStampSync* pTSSync = NULL;
    if (FAILED(
      m_pResp->QueryInterface(IID_IHXTimeStampSync, (void**)&pTSSync)))
    {
      // this shouldn't happen...
      HX_ASSERT(!"IHXTimeStampSync not implemented");
      return;
    } 
      
    UINT32 ulInitialRefTime = 0;
    UINT32 ulInitialThisTime = pPacket->GetTime();

    if (pTSSync->NeedInitialTS(m_sessionID))
    {
      pTSSync->SetInitialTS(m_sessionID, pPacket->GetTime());
      ulInitialRefTime = ulInitialThisTime;
    }
    else
    {
      ulInitialRefTime = pTSSync->GetInitialTS(m_sessionID);
    } 
    HX_RELEASE(pTSSync);
    
    RTSPStreamData* pStreamData = 
      m_pStreamHandler->getStreamData(pPacket->GetStreamNumber());

    HX_ASSERT(pStreamData != NULL);
    if (pStreamData)
    {
      // calc the difference b/n reference stream
      if (ulInitialThisTime >= ulInitialRefTime)
      {
          // we want RTP time
          if (pStreamData->m_pTSConverter)
          {
            m_lTimeOffsetRTP = 
                pStreamData->m_pTSConverter->hxa2rtp(ulInitialThisTime - ulInitialRefTime);
          }
          else
          {
            m_lTimeOffsetRTP = ulInitialThisTime - ulInitialRefTime;
          }     
      }
      else
      {
          // we want RTP time
          if (pStreamData->m_pTSConverter)
          {
            m_lTimeOffsetRTP = 
                pStreamData->m_pTSConverter->hxa2rtp(ulInitialRefTime - ulInitialThisTime);
          }
          else
          {
            m_lTimeOffsetRTP = ulInitialRefTime - ulInitialThisTime;
          }     
          
          m_lTimeOffsetRTP *= -1;
      }
    }
}

// The pPacketBuf is returned with an AddRef(), as it must.

HX_RESULT
RTPBaseTransport::makePacket(BasePacket* pBasePacket, 
                      REF(IHXBuffer*) pPacketBuf)
{
    if(!m_bIsSource)
    {
      HX_ASSERT(!"Player shouldn't be making pkt");   
      return HXR_UNEXPECTED;
    }

    IHXPacket* pPacket = pBasePacket->GetPacket();

    if (!pPacket)
    {
      return HXR_UNEXPECTED;
    }
    else if (pPacket->IsLost())
    {
      pPacket->Release();
      return HXR_OK;
    }
 
    IHXBuffer* pBuffer = pPacket->GetBuffer();
    UINT32 bufLen = pBuffer->GetSize();
    UINT16 streamNumber = pPacket->GetStreamNumber();
    UINT16 ruleNumber = pPacket->GetASMRuleNumber();
    UINT8  ruleFlags = pPacket->GetASMFlags();

    // it better be the same
    HX_ASSERT(m_streamNumber == streamNumber);

    RTSPStreamData* pStreamData = 
      m_pStreamHandler->getStreamData(streamNumber);

    //XXXBAB
    if (!pStreamData->m_packetSent)
    { 
      pStreamData->m_packetSent = TRUE;
    }
    
    pStreamData->m_seqNo = pBasePacket->m_uSequenceNumber;

    /*
     *      Make RTP Packet
     */
    RTPPacket pkt;
    HX_RESULT hresult = HXR_OK;
    BOOL bCompressed = FALSE; //XXXBAB don't compress anything yet...
    UINT32 packetLen = 0;
    pkt.version_flag = 2;
    pkt.padding_flag = 0;
    pkt.csrc_len = 0;

    /*
     *      Basics
     */
    pkt.seq_no = pStreamData->m_seqNo;
    pkt.data.data = (INT8*)pBuffer->GetBuffer();
    pkt.data.len = HX_SAFEINT(pBuffer->GetSize());
    pkt.ssrc = m_pReportHandler->GetSSRC();
    pkt.extension_flag = 0;
    pkt.payload = m_rtpPayloadType;

    /*
     *      IHXRTPPacket support
     */
    if (pStreamData->m_bFirstPacket)
    {
      // figure out pkt type
      IHXRTPPacket* pRTPPacket = NULL;
      pStreamData->m_bUsesRTPPackets = (pPacket->QueryInterface(
                                  IID_IHXRTPPacket, 
                                  (void**) &pRTPPacket)
                                == HXR_OK);
      if (pStreamData->m_bUsesRTPPackets)
      {
          HX_ASSERT(pRTPPacket == pPacket);
          if (pRTPPacket != pPacket)
          {
            return HXR_INVALID_PARAMETER;
          }
      }
      HX_RELEASE(pRTPPacket);

      // figure out marker bit handling routine
      if (NULL == m_pMBitHandler)
      {
          IHXRTPPacketInfo* pRTPPacketInfo = NULL;    
          if (pPacket->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK)
          {
            m_pMBitHandler = &RTPBaseTransport::MBitRTPPktInfo;
            pRTPPacketInfo->Release();
          }
          else
          {
            m_pMBitHandler = &RTPBaseTransport::MBitASMRuleNo;
          }       
      }     
    }

    /* 
     * Marker Bit
     */
    (this->*m_pMBitHandler)(pkt.marker_flag, pPacket, ruleNumber);

    if (m_bRTPTimeSet)
    {
      SyncTimestamp(pPacket);
    }     

    /*
     *      Timestamp
     */
    if (pStreamData->m_bUsesRTPPackets)
    {
      pkt.timestamp = ((IHXRTPPacket*) pPacket)->GetRTPTime();
    }
    else if (pStreamData->m_pTSConverter)
    {
      pkt.timestamp = 
          pStreamData->m_pTSConverter->hxa2rtp(pPacket->GetTime());
    }
    else
    {
      pkt.timestamp = pPacket->GetTime();
    }

    /*
     *      Extension and asm rule
     */
    if (RTP_OP_ASMRULES == m_ulExtensionSupport)
    {
      // this is the only one right now.
      pkt.extension_flag = 1;
      
      pkt.op_code = RTP_OP_ASMRULES;
      pkt.op_code_data_length = 1;
      pkt.asm_flags = ruleFlags;
      pkt.asm_rule = ruleNumber;
    }
    else
    {
      pkt.extension_flag = 0;
    }

    if (pStreamData->m_bFirstPacket)
    { 
      m_pRTCPTran->startScheduler();      
      m_pRTCPTran->m_bSendBye = TRUE;
      pStreamData->m_bFirstPacket = FALSE;

      // init report handler with starting NTP time and
      // 0 RTP time as the reference point.
      m_pReportHandler->Init(*m_pFirstPlayTime, 
                         0, 
                         pStreamData->m_pTSConverter);
      
      // at this point, it should have the same stream number
      HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber);
      HX_ASSERT(m_bRTPTimeSet);
    }

    // externally, we need to offset the timestamp...
    // 
    // XXXGo 
    // In RTSP PLAY Response msg, there is a RTP-Info header in which there 
    // is a rtp timestap that corresponds to NTP time spesified in PLAY Request.  
    // Since PLAY Response goes out before we ever receive the first pkt, it 
    // always returns RTP time equivalent of NTP time in a Range header as a timestamp. 
    // So, we need to offset the timestamp here.  
    // In future, we might want to change the calling sequence so we don't have to do this...    
    if (m_bRTPTimeSet)
    {
        INT64 nNewRTPOffset = 0;
        UINT32 ulRawRTPTime = 0;
        HXTimeval hxNow = m_pScheduler->GetCurrentSchedulerTime();
        Timeval tvNow;
      NTPTime ntpNow;
      
      // Convert scheduler time to something we can use.
        tvNow.tv_sec = hxNow.tv_sec;
        tvNow.tv_usec = hxNow.tv_usec;
      
        ntpNow = NTPTime(tvNow);
      
      
        if (pStreamData->m_pTSConverter)
        {
            ulRawRTPTime = pStreamData->m_pTSConverter->hxa2rtp((UINT32)(ntpNow - m_pReportHandler->GetNTPBase()));
        }
        else
        {
            ulRawRTPTime = (UINT32)(ntpNow - m_pReportHandler->GetNTPBase());
        }
      
      nNewRTPOffset = CAST_TO_INT64 pkt.timestamp - CAST_TO_INT64 ulRawRTPTime;
      
        m_pReportHandler->SetRTPBase(nNewRTPOffset);
      
      // if this is true, there was a Range header in a PLAY request
      m_bRTPTimeSet = FALSE;
      
      if (m_bIsLive)
      {
          m_ulBaseTS = pkt.timestamp - m_lTimeOffsetRTP;
      }
      
    }

    if (m_bIsLive)
    {
      pkt.timestamp -= m_ulBaseTS;
    }

    pStreamData->m_lastTimestamp = pkt.timestamp;

    /*
     * Create enough space to account for the op code and
     * op code data if the extension bit is set
     */
    packetLen = pkt.static_size() + pBuffer->GetSize() + 
            (pkt.extension_flag 
            ? sizeof(UINT16) + (pkt.op_code_data_length * sizeof(UINT32))
            : 0);

    IHXBuffer* pPacketOut = NULL;
    m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                          (void**)&pPacketOut);
    if(pPacketOut)
    {
        pPacketOut->SetSize(packetLen);
        pkt.pack(pPacketOut->GetBuffer(), packetLen);
        pPacketOut->SetSize(packetLen);  //update with actual packed length

#ifdef DEBUG
        if (m_drop_packets && ++m_packets_since_last_drop % 10 == 0)
        {
          goto RTPsendContinue;
        }
#endif /* DEBUG */

        updateQoSInfo(bufLen);

        // out params...
        pPacketBuf = pPacketOut;

        /* update */
        m_pReportHandler->OnRTPSend(pkt.seq_no, 1, pBasePacket->GetSize(), pkt.timestamp);    
    }
    else
    {
        hresult = HXR_OUTOFMEMORY;
    }

#ifdef DEBUG
RTPsendContinue:
#endif    

    pBuffer->Release();
    pPacket->Release();
    return hresult;
}

HX_RESULT
RTPBaseTransport::handlePacket(IHXBuffer* pBuffer)
{
    if (!m_ulPacketsSent && m_bEmulatePVSession)
    {
        /* XXXMC 
         * Special-case handling for PV clients
         */
        UINT8* pUDPPktPayload = pBuffer->GetBuffer();
        UINT8 ucRTPVersion  = (*pUDPPktPayload & 0xc0)>>6;

        if(ucRTPVersion != 2)
        {
            DPRINTF(D_INFO, ("RTP: PV CLIENT PKT RECVD\n"));
            this->sendPVHandshakeResponse(pUDPPktPayload);
            return HXR_OK;
        }
    }

    return _handlePacket(pBuffer, TRUE);
}

HX_RESULT
RTPBaseTransport::_handlePacket(IHXBuffer* pBuffer, BOOL bIsRealTime)
{
    RTPPacket pkt;
    UINT32 timeStampHX = 0;
    UINT32 timeStampRTP = 0;
    HX_RESULT hresult = HXR_OK;
    BOOL bHasASMRules = FALSE;

    if (m_bIsSource)
    {
      return HXR_OK;
    }

    if(pkt.unpack(pBuffer->GetBuffer(), pBuffer->GetSize()) == 0)
    {
      return HXR_UNEXPECTED;
    }
    
    if(pkt.version_flag != 2)
    {
      return HXR_INVALID_VERSION;
    }

    // ignore the packets not matching the payload type
    if (pkt.payload != m_rtpPayloadType)
    {
        return HXR_OK;
    }

    // stick with the 1st ssrc with the same payload type
    if (!m_bSSRCDetermined)
    {
        m_bSSRCDetermined = TRUE;

        m_ulSSRCDetermined = pkt.ssrc;
        m_pRTCPTran->setSSRC(m_ulSSRCDetermined);
    }
    // ignore the packets with different ssrc but with the same payload time
    else if (m_ulSSRCDetermined != pkt.ssrc)
    {
        return HXR_OK;
    }

    RTSPStreamData* pStreamData = m_pStreamHandler->getStreamData(m_streamNumber);
    HX_ASSERT(pStreamData != NULL);

    // If this function is called in Real-Time, handle RTCP response as needed
    if (bIsRealTime)
    {
      HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
      ULONG32 ulPktRcvTime = rmatv.tv_sec*1000 + rmatv.tv_usec/1000;

      // Convert reception time to the packet time stamp units
      if (m_pRTCPTran->GetTSConverter())
      {
          ulPktRcvTime = m_pRTCPTran->GetTSConverter()->hxa2rtp(ulPktRcvTime);
      }

      // Gather data for RTCP RR
      if (pStreamData->m_bFirstPacket)
      {
          if (!m_pRTCPTran->isShedulerStarted())
          {
            m_pRTCPTran->startScheduler();
          }
      }
      
      /* update */
      m_pReportHandler->OnRTPReceive(pkt.ssrc, 
                               pkt.seq_no,
                               pkt.timestamp, 
                               ulPktRcvTime);
      
      /* send RR if necessary */
      if (m_pRTCPTran->m_bSendReport && m_pRTCPTran->m_bSendRTCP)
      {
          m_pRTCPTran->sendReceiverReport();
          m_pRTCPTran->m_bSendReport = FALSE;
          m_pRTCPTran->scheduleNextReport();
      }
    }

    // If we are waiting for the start info, we cannot place the
    // packets into the transport buffer yet since we need the
    // start info to proper scale and offset the packet time
    // stamps
    if (m_bWaitForStartInfo)
    {
      if (m_StartInfoWaitQueue.GetCount() == 0)
      {
          // First packet received
          m_ulStartInfoWaitStartTime = HX_GET_TICKCOUNT();
          m_uFirstSeqNum = pkt.seq_no;
          m_ulFirstRTPTS = pkt.timestamp;
          // For Live stream, postpone identification of first packet until we get
          // a contiguous sequence (some servers have discontinuity on start 
          // of live streams)
          if (!m_bIsLive)
          {
            m_bFirstSet = TRUE;
          }
      }
      else
      {
          LONG32 lSeqNumDelta = ((LONG32) (((UINT16) pkt.seq_no) - m_uFirstSeqNum));

          // First SeqNum and First TS need not belong to the same packet
          // We are really looking for the lowest seq. num and lowest time
          // stamp so that we do not throw away any packets and so that the
          // time is not wrapped before 0
          if (lSeqNumDelta < 0)
          {
            m_uFirstSeqNum = pkt.seq_no;
          }
          else if (!m_bFirstSet)
          {
            // If we have not encountered continuty yet, look for it
            if (lSeqNumDelta > MAX_NUM_PACKETS_GAPPED_FOR_LIVE_START)
            {
                resetStartInfoWaitQueue();
                m_uFirstSeqNum = pkt.seq_no;
                m_ulFirstRTPTS = pkt.timestamp;
            }
            else
            {
                // Continuity found - we have the start
                m_bFirstSet = TRUE;
            }
          }

          if (((LONG32) (m_ulFirstRTPTS - pkt.timestamp)) > 0)
          {
            m_ulFirstRTPTS = pkt.timestamp;
          }
      }

      pBuffer->AddRef();
      m_StartInfoWaitQueue.AddTail(pBuffer);

      /* If start Info has been at least partially set or the wait has been
         aborted for some reason (usually when we know it will not be set 
         through out-of band methods <-> RTP Info did not contain start Info
         we need) or we time-out, stop waiting and hand off acumulated 
           packets to the transport buffer.
         Also if starting seq. number is not explicitly communicated,
         scan through few starting packets until we have a good starting 
         sequence number (contiguous) since some servers send lossy streams 
         in the beginning. */
      if (m_bSeqNoSet || 
          ((m_bRTPTimeSet || m_bAbortWaitForStartInfo) && 
           ((!m_bIsLive) || (m_StartInfoWaitQueue.GetCount() >= MIN_NUM_PACKETS_SCANNED_FOR_LIVE_START))) ||
          ((HX_GET_TICKCOUNT() - m_ulStartInfoWaitStartTime) > 
           MAX_STARTINFO_WAIT_TIME))
      {
          IHXBuffer* pStoredBuffer;

          m_bWaitForStartInfo = FALSE;
          m_bAbortWaitForStartInfo = FALSE;
          m_bFirstSet = TRUE;

          while (!m_StartInfoWaitQueue.IsEmpty())
          {
            pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead();

            if (pStoredBuffer)
            {
                _handlePacket(pStoredBuffer, FALSE);
                pStoredBuffer->Release();
            }
          }
      }

      return HXR_OK;
    }

    /*
     *      Extension and asm rule
     */
    if (pkt.extension_flag == 1)
    {
      HX_ASSERT(RTP_OP_PACKETFLAGS != pkt.op_code);

      if (RTP_OP_ASMRULES == pkt.op_code)
      {
          bHasASMRules = TRUE;
      }
    }

    /*
     *      RTP-Info:  if either one of them were not in RTP-Info, we need to set 
     *      it right here.
     */
    if (!m_bSeqNoSet)
    {
      if (!m_bFirstSet)
      {
          m_uFirstSeqNum = pkt.seq_no;
      }

#ifdef RTP_MESSAGE_DEBUG
      messageFormatDebugFileOut("INIT: StartSeqNum not in RTP-Info");
#endif      // RTP_MESSAGE_DEBUG

      setFirstSeqNum(m_streamNumber, m_uFirstSeqNum);
    }     
    if (!m_bRTPTimeSet)
    {
      if (!m_bFirstSet)
      {
          m_ulFirstRTPTS = pkt.timestamp;
      }

#ifdef RTP_MESSAGE_DEBUG
      messageFormatDebugFileOut("INIT: RTPOffset not in RTP-Info");
#endif      // RTP_MESSAGE_DEBUG

      setFirstTimeStamp(m_streamNumber, m_ulFirstRTPTS);
      m_bWeakStartSync = TRUE;
    }
    
    /*
     *      TimeStamp
     */    
    // for RealMedia in scalable multicast, we don't want to adjust
    // the timestamp since the packets' time is in ms already and 
    // A/V is always in sync
    if (m_bSkipTimeAdjustment)
    {
        timeStampRTP = timeStampHX = pkt.timestamp;
    }
    else if (m_bLastTSSet && (m_ulLastRawRTPTS == (ULONG32)pkt.timestamp))
    {
      // We want to preserve same time stamped packet sequences
      // since some payloads may depend on it for proper coded frame
      // assembly
      timeStampRTP = m_ulLastRTPTS;
      timeStampHX = m_ulLastHXTS;
    }
    else
    {
      if (pStreamData->m_pTSConverter)
      {
          timeStampHX = pStreamData->m_pTSConverter->rtp2hxa(pkt.timestamp);
      }
      else
      {
          timeStampHX = pkt.timestamp;
      }

      timeStampHX += (m_lSyncOffsetHX + 
                   m_lOffsetToMasterHX - 
                   m_lTimeOffsetHX);  
      timeStampRTP = (pkt.timestamp +
                  m_lSyncOffsetRTP +
                  m_lOffsetToMasterRTP - 
                  m_lTimeOffsetRTP);

      m_ulLastHXTS = timeStampHX;
      m_ulLastRTPTS = timeStampRTP;
      m_ulLastRawRTPTS = pkt.timestamp;
      m_bLastTSSet = TRUE;
    }

#ifdef RTP_MESSAGE_DEBUG
    if (m_bMessageDebug)
    {
      messageFormatDebugFileOut("PKT: (Seq=%6u,RTPTime=%10u) -> (HXTimeval=%10u,RTPTimeval=%10u)",
                          ((UINT16) pkt.seq_no), pkt.timestamp,
                          timeStampHX, timeStampRTP);         
    }
#endif      // RTP_MESSAGE_DEBUG
    
    pStreamData->m_bFirstPacket = FALSE;
    
    CHXPacket* pPacket = new CHXRTPPacket;
    if(pPacket)
    {
        pPacket->AddRef();
    }
    else
    {
        hresult = HXR_OUTOFMEMORY;
    }

    UINT32 dataOffset=
        (UINT32)((PTR_INT)pkt.data.data - (PTR_INT)pBuffer->GetBuffer());
    IHXBuffer* pPktBuffer = 
        new CHXStaticBuffer(pBuffer, dataOffset, pkt.data.len);

    if(pPktBuffer)
    {
        pPktBuffer->AddRef();
    }
    else
    {
        hresult = HXR_OUTOFMEMORY;
    }
    if( hresult == HXR_OUTOFMEMORY )
    {
        HX_RELEASE(pPacket);
        return hresult;
    } 
    
    if (bHasASMRules)
    {
      pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
            (UINT8) pkt.asm_flags, pkt.asm_rule);
    }
    else if(pkt.marker_flag == 1 && m_bHasMarkerRule)
    {
      pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
            HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, m_markerRuleNumber);
    }
    else
    {
      pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
          HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, pkt.marker_flag ? 1 : 0);
    }
    
    if (m_bIsSource)
    {
      hresult = m_pResp->PacketReady(HXR_OK, 
                               m_sessionID, 
                               pPacket);
    }
    else
    {
      hresult = storePacket(pPacket,
                            m_streamNumber,
                            pkt.seq_no,
                            0,
                            0);
    }

    pPktBuffer->Release();
    pPacket->Release();

    return hresult;
}

HX_RESULT 
RTPBaseTransport::handleRTCPSync(NTPTime ntpTime, ULONG32 ulRTPTime)
{
    HX_RESULT retVal = HXR_IGNORE;

    // We use RTCP synchronization on live streams only.
    // Static streams have no reason not to be synchronzied in RTP time.
    // Making use of RTCP for static streams may result in unwanted sync.
    // noise/error for servers who do not generate proper RTCP ntp-rtp
    // mapping.  RealServers prior to RealServer9 had error in RTCP reported
    // ntp-rtp mapping (max. error 1s, avg 500ms).
    if ((m_bIsLive || m_bWeakStartSync) && !m_bSkipTimeAdjustment)
    {
      ULONG32 ulNtpHX = ntpTime.toMSec();
      RTSPStreamData* pStreamData = 
          m_pStreamHandler->getStreamData(m_streamNumber);

#ifdef RTP_MESSAGE_DEBUG
      messageFormatDebugFileOut("RTCP-SYNC: Received NTPTime=%u RTPTime=%u", 
                          ulNtpHX, ulRTPTime);
#endif      // RTP_MESSAGE_DEBUG

      // We ignore the RTCP sync until we can compute npt (m_bRTPTimeSet) or
      // if the RTCP packet contains no synchronization information 
      // (ulNtpHX == 0)
      if (pStreamData && (ulNtpHX != 0) && m_bRTPTimeSet)
      {
          // Npt time can be computed (ulHXTime)
          ULONG32 ulHXTime = pStreamData->m_pTSConverter->rtp2hxa(ulRTPTime);
          
          retVal = HXR_OK;
          
          if ((!m_pSyncServer) && m_pResp)
          {
            m_pResp->QueryInterface(IID_IHXTransportSyncServer, 
                              (void**) &m_pSyncServer);
          }
          
          if (m_bNTPtoHXOffsetSet)
          {
            // We can sync - NTP to NPT offset is known
            ULONG32 ulExpectedHXTime = ulNtpHX + m_lNTPtoHXOffset;
            LONG32 lSyncOffsetHX = ulExpectedHXTime - ulHXTime;
            LONG32 lSyncOffsetChange = lSyncOffsetHX - m_lSyncOffsetHX;
            
            if ((lSyncOffsetChange > ACCEPTABLE_SYNC_NOISE) ||
                (lSyncOffsetChange < (-ACCEPTABLE_SYNC_NOISE)))
            {
                if (m_bIsSyncMaster && m_pSyncServer)
                {
#ifdef RTP_MESSAGE_DEBUG
                  messageFormatDebugFileOut("RTCP-SYNC: Distribute Master Sync NPTTime=%u SyncOffset=%d", 
                                        ulHXTime, -lSyncOffsetHX);
#endif      // RTP_MESSAGE_DEBUG
                  m_pSyncServer->DistributeSync(ulHXTime, -lSyncOffsetHX);
                }
                else
                {             
                  m_lSyncOffsetHX = lSyncOffsetHX;
                  if (lSyncOffsetHX >= 0)
                  {
                      m_lSyncOffsetRTP = (LONG32) 
                        (pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lSyncOffsetHX));
                  }
                  else
                  {
                      m_lSyncOffsetRTP = (LONG32) 
                        (-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lSyncOffsetHX)));
                  }
#ifdef RTP_MESSAGE_DEBUG
                  messageFormatDebugFileOut("RTCP-SYNC: Self-Sync SyncOffset=%d SyncOffsetRTP=%d", 
                                        m_lSyncOffsetHX, m_lSyncOffsetRTP);
#endif      // RTP_MESSAGE_DEBUG
                }
            }
          }
          else
          {
            // This the first RTCP sync accross all streams, anchor sync
            if (m_pSyncServer)
            {
#ifdef RTP_MESSAGE_DEBUG
                messageFormatDebugFileOut("RTCP-SYNC: Distribute NTP-NPT Mapping NTPTime=%u NPTTime=%u", 
                                    ulNtpHX, ulHXTime);
#endif      // RTP_MESSAGE_DEBUG
                m_pSyncServer->DistributeSyncAnchor(ulHXTime, ulNtpHX);
            }
          }
      }
    }

    return retVal;
}

HX_RESULT
RTPBaseTransport::anchorSync(ULONG32 ulHXTime, ULONG32 ulNTPTime)
{
    HX_RESULT retVal = HXR_OK;

    m_lNTPtoHXOffset = ulHXTime - ulNTPTime;
    m_bNTPtoHXOffsetSet = TRUE;

#ifdef RTP_MESSAGE_DEBUG
    messageFormatDebugFileOut("RTCP-SYNC: Received NTP-NPT Mapping NTPTime=%u NPTTime=%u NTPtoNPTOffset=%d", 
                        ulNTPTime, ulHXTime, m_lNTPtoHXOffset);
#endif      // RTP_MESSAGE_DEBUG

    return retVal;
}

HX_RESULT 
RTPBaseTransport::handleMasterSync(ULONG32 ulHXTime, LONG32 lHXOffsetToMaster)
{
    HX_RESULT retVal = HXR_IGNORE;
    RTSPStreamData* pStreamData = 
          m_pStreamHandler->getStreamData(m_streamNumber);

    if (pStreamData && (!m_bIsSyncMaster))
    {
      retVal = HXR_OK;
      
      m_lOffsetToMasterHX = lHXOffsetToMaster;
      if (lHXOffsetToMaster >= 0)
      {
          m_lOffsetToMasterRTP = (LONG32) 
            (pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lHXOffsetToMaster));
      }
      else
      {
          m_lOffsetToMasterRTP = (LONG32) 
            (-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lHXOffsetToMaster)));
      }

#ifdef RTP_MESSAGE_DEBUG
      messageFormatDebugFileOut("RTCP-SYNC: Master-Sync NPTTime=%u MasterSyncOffset=%d MasterSyncOffsetRTP=%d", 
                          ulHXTime, m_lOffsetToMasterHX, m_lOffsetToMasterRTP);
#endif      // RTP_MESSAGE_DEBUG
    }

    return retVal;
}

void
RTPBaseTransport::resetStartInfoWaitQueue(void)
{
    IHXBuffer* pStoredBuffer;

    while (!m_StartInfoWaitQueue.IsEmpty())
    {
      pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead();
      
      HX_RELEASE(pStoredBuffer);
    }
}

HX_RESULT
RTPBaseTransport::streamDone(UINT16 streamNumber)
{
    HX_ASSERT(m_streamNumber == streamNumber);
    HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber);
    HX_RESULT hresult = HXR_OK;

    HX_ASSERT(m_pRTCPTran);

    if (!m_bActive)
    {
      // this stream is not active, don't do anything.
    }
    else if (m_bIsSource)
    {
      hresult= m_pRTCPTran->streamDone(streamNumber);
    }
    else
    {
      // send BYE pkt
      m_pRTCPTran->streamDone(streamNumber);
      hresult = m_pResp->OnStreamDone(HXR_OK, streamNumber);
    }

    return hresult;
}

STDMETHODIMP
RTPBaseTransport::InitBw(IHXBandwidthManagerInput* pBwMgr)
{
    HX_RELEASE(m_pBwMgrInput);

    m_pBwMgrInput = pBwMgr;
    pBwMgr->AddRef();

    return HXR_OK;
}

STDMETHODIMP
RTPBaseTransport::SetTransmitRate(UINT32 ulBitRate)
{
    return HXR_OK;
}

/*
 * XXXMC
 * Special-case handling for PV clients
 */
void
RTPBaseTransport::setPVEmulationMode(BOOL bPVSessionFlag)
{
    m_bEmulatePVSession = bPVSessionFlag;
}

void
RTPBaseTransport::setRTCPTransport(RTCPBaseTransport* pRTCPTran)
{
    HX_ASSERT(pRTCPTran);
    HX_ASSERT(!m_pRTCPTran);
    m_pRTCPTran = pRTCPTran;
    m_pRTCPTran->AddRef();

    // pointing to the same instatnce
    HX_ASSERT(m_pReportHandler);
    m_pRTCPTran->m_pReportHandler = m_pReportHandler;
}

void 
RTPBaseTransport::MBitRTPPktInfo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
    BOOL b = FALSE;
    IHXRTPPacketInfo* pRTPPacketInfo = NULL;    
    if (pPkt->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK)
    {
      if (pRTPPacketInfo->GetMarkerBit(b) == HXR_OK && b)
      {
          bMBit = TRUE;
      }
      else
      {
          bMBit = FALSE;
      }
      pRTPPacketInfo->Release();
    }
    else
    {
      bMBit = FALSE;
    } 
}

void 
RTPBaseTransport::MBitASMRuleNo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
    bMBit = m_bHasMarkerRule && ((unRuleNo & 0x1) == m_markerRuleNumber);
}

#ifdef RTP_MESSAGE_DEBUG
void
RTPBaseTransport::messageFormatDebugFileOut(const char* fmt, ...)
{
    if(m_bMessageDebug)
    {
      va_list args;
      char buf[4096]; /* Flawfinder: ignore */

      SafeSprintf(buf, 4096, "%s.%d", (const char*) m_messageDebugFileName, 
                        m_streamNumber);

      va_start(args, fmt);
      
      FILE* fp = fopen(buf, "a");
      if (fp)
      {
          vsprintf(buf, fmt, args);
          fprintf(fp, "%s\n", buf);
          fclose(fp);
      }

      va_end(args);
    }
}
#endif      // RTP_MESSAGE_DEBUG

/*
 *   RTP UDP
 */
RTPUDPTransport::RTPUDPTransport(BOOL bIsSource)
    : RTPBaseTransport(bIsSource)
    , m_pUDPSocket(NULL)
    , m_foreignAddr(0)
    , m_foreignPort(0)
    , m_keepAliveSeq((UINT16)(random32(0) & 0xffff))
    , m_ulCurrentMulticastAddress(0)
    , m_ulCurrentMulticastPort(0)
    , m_pMCastUDPSocket(NULL)
{
}

RTPUDPTransport::~RTPUDPTransport()
{
    Done();
}

RTSPTransportTypeEnum
RTPUDPTransport::tag()
{
    return RTSP_TR_RTP_UDP;
}

void
RTPUDPTransport::Done()
{
    m_keepAlive.reset();

    if (m_pMCastUDPSocket)
    {
        m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    HX_RELEASE(m_pMCastUDPSocket);
    HX_RELEASE(m_pUDPSocket);
    RTPBaseTransport::Done();
}

HX_RESULT
RTPUDPTransport::init(IUnknown* pContext,
                   IHXUDPSocket* pSocket,
                   IHXRTSPTransportResponse* pResp)
{
    m_pResp = pResp;
    m_pResp->AddRef();
    
    m_pUDPSocket = pSocket;
    m_pUDPSocket->AddRef();

    /* Set DiffServ Code Point */
    IHXSetSocketOption* pOpt = NULL;
    if (SUCCEEDED(m_pUDPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
    {
      IHXQoSDiffServConfigurator* pCfg = NULL;
      if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
      {
          pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA);
          pCfg->Release();
          pCfg = NULL;
      }
      
      pOpt->Release();
      pOpt = NULL;
    }

    HX_RESULT hresult = Init(pContext);
    if(HXR_OK != hresult)
    {
      return hresult;
    }

#ifdef DEBUG
    if (debug_func_level() & DF_DROP_PACKETS)
    {
      m_drop_packets = TRUE;
    }
#endif /* DEBUG */

    RTPBaseTransport::init();

    return HXR_OK;
}

void
RTPUDPTransport::setForeignAddress(UINT32 foreignAddr, UINT16 foreignPort)
{
    m_foreignAddr = foreignAddr;
    m_foreignPort = foreignPort;

    UINT32 natTimeout = GetNATTimeout(m_pContext);

    if (!m_bIsSource && natTimeout)
    {
      // Initialize keepalive object
      m_keepAlive.Init(m_pScheduler, natTimeout, new KeepAliveCB(this));
    
      // Do initial "poke" through the NAT
      onNATKeepAlive();
    }
}

HX_RESULT RTPUDPTransport::handlePacket(IHXBuffer* pBuffer)
{
    m_keepAlive.OnActivity();
    return RTPBaseTransport::handlePacket(pBuffer);
}

void 
RTPUDPTransport::JoinMulticast(UINT32 ulAddress, UINT32 ulPort, IHXUDPSocket* pUDP)
{
    if (m_ulCurrentMulticastAddress)
    {
      m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    else
    {
      m_pMCastUDPSocket = pUDP;
        m_pMCastUDPSocket->AddRef();
    }

    m_pMCastUDPSocket->JoinMulticastGroup(ulAddress, HXR_INADDR_ANY);
    m_bMulticast = TRUE;
    m_ulCurrentMulticastAddress = ulAddress;
    m_ulCurrentMulticastPort = ulPort;

    if (m_pStreamHandler)
    {
      RTSPStreamData* pStreamData = m_pStreamHandler->firstStreamData();

      ASSERT(pStreamData);

      while(pStreamData)
      {
          pStreamData->m_pTransportBuffer->SetMulticast();
          pStreamData = m_pStreamHandler->nextStreamData();
      }
    }

    return;
}

HX_RESULT RTPUDPTransport::onNATKeepAlive()
{
    DPRINTF(D_INFO, ("RTP : onNATKeepAlive()\n"));

    // Send an RTP packet with PT=0 and no payload

    IHXBuffer* pPktBuf = NULL;
    if (m_pCommonClassFactory &&
      (HXR_OK == m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pPktBuf)))
    {
      RTPPacket pkt;
      
      pkt.version_flag = 2;
      pkt.padding_flag = 0;
      pkt.csrc_len = 0;
      pkt.marker_flag = 0;
      pkt.extension_flag = 0;
      
      pkt.data.data = 0;
      pkt.data.len = 0;
      pkt.ssrc = m_pReportHandler->GetSSRC();
      
      pkt.payload = 0;
      pkt.seq_no = m_keepAliveSeq++;
      pkt.timestamp =  HX_GET_TICKCOUNT() * 8; // Timestamp in 1/8000 sec
      
      UINT32 packetLen = pkt.static_size() + pkt.data.len;
      
      if (HXR_OK == pPktBuf->SetSize(packetLen))
      {
          // Pack the data into the buffer
          pkt.pack(pPktBuf->GetBuffer(), packetLen);
          
          pPktBuf->SetSize(packetLen); // Update the packet size
          
          writePacket(pPktBuf);
      }
    }

    HX_RELEASE(pPktBuf);

    return HXR_OK;
}

HX_RESULT
RTPUDPTransport::writePacket(IHXBuffer* pSendBuffer)
{    
    if (!m_pUDPSocket)
        return HXR_FAIL;

    m_keepAlive.OnActivity();

    return m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuffer);
}

/*
 * XXXMC
 * Special-case handling for PV clients
 */
HX_RESULT
RTPUDPTransport::sendPVHandshakeResponse(UINT8* pPktPayload)
{
    IHXBuffer* pPktPayloadBuff = NULL;
    m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**) &pPktPayloadBuff);
    if (pPktPayloadBuff)
    {
        DPRINTF(D_INFO, ("RTP: Sending POKE PKT RESPONSE\n")); 
        pPktPayloadBuff->Set((UCHAR*)pPktPayload, 8);
        writePacket(pPktPayloadBuff);
        pPktPayloadBuff->Release();
    }
    return HXR_OK;
}

HX_RESULT
RTPUDPTransport::sendPacket(BasePacket* pPacket)
{
    HX_ASSERT(m_bActive);
    
    HX_RESULT theErr;
    if (m_ulPayloadWirePacket!=0)
    {
      IHXBuffer* pSendBuf = NULL;
      theErr = reflectPacket(pPacket, pSendBuf);
      
      if (HXR_OK == theErr)
      {
          theErr = writePacket(pSendBuf);
          pSendBuf->Release();
      }
      else if (HXR_IGNORE == theErr)
      {
          return HXR_OK;
      }

      return theErr;
    }
    
    IHXBuffer* pPacketBuf = NULL;
    
    theErr = makePacket(pPacket, pPacketBuf);

    if (HXR_OK == theErr)
    {
      theErr = writePacket(pPacketBuf);

      /* send SR if necessary */
      if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport &&
          m_pRTCPTran->m_bSendRTCP)
      {
          m_pRTCPTran->sendSenderReport();
          m_pRTCPTran->m_bSendReport = FALSE;
          m_pRTCPTran->scheduleNextReport();
      }
    }

    HX_RELEASE(pPacketBuf);
    return theErr;
}

RTPUDPTransport::KeepAliveCB::KeepAliveCB(RTPUDPTransport* pTransport):
    m_pTransport(pTransport),
    m_lRefCount(0)
{
    if(m_pTransport)
    {
      m_pTransport->AddRef();
    }
}

RTPUDPTransport::KeepAliveCB::~KeepAliveCB()
{
    HX_RELEASE(m_pTransport);
}

STDMETHODIMP
RTPUDPTransport::KeepAliveCB::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXCallback))
    {
        AddRef();
        *ppvObj = (IHXCallback*)this;
        return HXR_OK;
    }
    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP_(UINT32)
RTPUDPTransport::KeepAliveCB::AddRef()
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
RTPUDPTransport::KeepAliveCB::Release()
{
    if(InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    delete this;
    return 0;
}

STDMETHODIMP
RTPUDPTransport::KeepAliveCB::Func()
{
    if (m_pTransport)
    {
      m_pTransport->onNATKeepAlive();
    }
    return HXR_OK;
}


/*
 * RTP TCP
 */

RTPTCPTransport::RTPTCPTransport(BOOL bIsSource)
    : RTPBaseTransport(bIsSource)
    , m_pTCPSocket(0)
    , m_tcpInterleave(0)
{ 
    m_wrapSequenceNumber = DEFAULT_WRAP_SEQ_NO;
}

RTPTCPTransport::~RTPTCPTransport()
{
    HX_RELEASE(m_pTCPSocket);
}

void
RTPTCPTransport::Done()
{
    RTPBaseTransport::Done();
}


RTSPTransportTypeEnum
RTPTCPTransport::tag()
{
    return RTSP_TR_RTP_TCP;
}

HX_RESULT
RTPTCPTransport::init(IUnknown* pContext, 
                  IHXTCPSocket* pSocket,
                  IHXRTSPTransportResponse* pResp)
{
    m_pTCPSocket = pSocket;
    m_pTCPSocket->AddRef();
    m_pResp = pResp;
    m_pResp->AddRef();

    /* Set DiffServ Code Point */
    IHXSetSocketOption* pOpt = NULL;
    if (SUCCEEDED(m_pTCPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
    {
      IHXQoSDiffServConfigurator* pCfg = NULL;
      if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
      {
          pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA);
          pCfg->Release();
          pCfg = NULL;
      }
      
      pOpt->Release();
      pOpt = NULL;
    }

    HX_RESULT hresult = Init(pContext);
    if (HXR_OK != hresult)
    {
      return hresult;
    }

    RTPBaseTransport::init();

    return HXR_OK;
}

HX_RESULT
RTPTCPTransport::sendPacket(BasePacket* pPacket)
{
    HX_ASSERT(m_bActive);

    HX_RESULT theErr;
    if (m_ulPayloadWirePacket!=0)
    {
      IHXBuffer* pSendBuf = NULL;
      theErr = reflectPacket(pPacket, pSendBuf);
      
      if (HXR_OK == theErr)
      {
          theErr = writePacket(pSendBuf);

          pSendBuf->Release();
      }
      else if (HXR_IGNORE == theErr)
      {
          return HXR_OK;
      }

      return theErr;
    }

    IHXBuffer* pPacketBuf = NULL;
    
    theErr = makePacket(pPacket, pPacketBuf);

    if (HXR_OK == theErr)
    {
      theErr = writePacket(pPacketBuf);

      /* send SR if necessary */
      if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport && 
          m_pRTCPTran->m_bSendRTCP)
      {
          m_pRTCPTran->sendSenderReport();
          m_pRTCPTran->m_bSendReport = FALSE;
          m_pRTCPTran->scheduleNextReport();
      }     
    }

    HX_RELEASE(pPacketBuf);
    return theErr;
}

HX_RESULT
RTPTCPTransport::writePacket(IHXBuffer* pBuf)
{
    if (!m_pTCPSocket)
        return HXR_FAIL;

    // need to put $\000[datalen] in front of packet data

    UINT32 dataLen = pBuf->GetSize();

    if(dataLen > 0xffff)
    {
      return HXR_FAIL;
    }

    //XXXTDM: always true, m_tcpInteleave is signed (why?)
    //HX_ASSERT(0xFF != m_tcpInterleave);

    IHXBuffer* pHeader = NULL;
    m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                          (void**)&pHeader);
    BYTE* pHeaderData;

    if(!pHeader)
    {
        return HXR_OUTOFMEMORY;
    }
    pHeader->SetSize(4);
    pHeaderData = pHeader->GetBuffer();

    pHeaderData[0] = '$';
    pHeaderData[1] = m_tcpInterleave;
    putshort(&pHeaderData[2], (UINT16)dataLen);

    HX_RESULT rc;
    rc = m_pTCPSocket->Write(pHeader);
    if (SUCCEEDED(rc))
    {
        rc = m_pTCPSocket->Write(pBuf);
    }
    if(rc)
    {
        m_pResp->OnProtocolError(HXR_NET_SOCKET_INVALID);
    }
    pHeader->Release();

    return rc;
}


/******************************************************************************
*   RTCP RTCP RTCP RTCP RTCP
******************************************************************************/

RTCPBaseTransport::RTCPBaseTransport(BOOL bIsSource):
    RTSPTransport(bIsSource),
    m_lRefCount(0),
    m_bCallbackPending(FALSE),
    m_pReportCallback(0),
    m_reportTimeoutID(0),
    m_bSchedulerStarted(FALSE),
    m_bSendRTCP(TRUE),
    m_bSendBye(FALSE),
    m_bSendReport(FALSE),
    m_pcCNAME(NULL),
    m_pReportHandler(NULL),
    m_pDataTransport(NULL),
    m_pTSConverter(NULL),
    m_streamNumber(0xffff),
    m_pSignalBus(NULL),
    m_pQoSSignal_RR(NULL),
    m_pQoSSignal_APP(NULL),
    m_pSessionId(NULL),
    m_bSSRCDetermined(FALSE),
    m_ulSSRCDetermined(0)
{
}

RTCPBaseTransport::~RTCPBaseTransport()
{
    HX_DELETE(m_pTSConverter);
}

STDMETHODIMP
RTCPBaseTransport::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXQoSSignalSourceResponse))
    {
        AddRef();
        *ppvObj = (IHXQoSSignalSourceResponse*)this;
        return HXR_OK;
    }

    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP_(UINT32)
RTCPBaseTransport::AddRef()
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
RTCPBaseTransport::Release()
{
    if(InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    delete this;
    return 0;
}

void
RTCPBaseTransport::Done()
{
    stopScheduler();
    HX_RELEASE(m_pPacketFilter);
    HX_VECTOR_DELETE(m_pcCNAME);
    HX_DELETE(m_pReportHandler);

    HX_RELEASE(m_pQoSSignal_RR);
    HX_RELEASE(m_pQoSSignal_APP);
    HX_RELEASE(m_pSignalBus);
    HX_RELEASE(m_pSessionId);
}

HX_RESULT
RTCPBaseTransport::init()
{
    HX_ASSERT(!m_pReportCallback);
    HX_ASSERT(!m_pcCNAME);
    
    m_pReportCallback = new ReportCallback(this);
    if(!m_pReportCallback)
    {
        return HXR_OUTOFMEMORY;
    }
    m_pReportCallback->AddRef();

    char cname[16] = {0}; /* Flawfinder: ignore */
    itoa(random32(HX_GET_TICKCOUNT()), cname, 10);    
    m_pcCNAME = (BYTE*)new_string(cname);
    HX_ASSERT(m_pcCNAME);
    return HXR_OK;
}

void 
RTCPBaseTransport::addStreamInfo (RTSPStreamInfo* pStreamInfo,
                          UINT32 ulBufferDepth)
{
    UINT32 ulInvalidRate = (UINT32)-1;

    UINT32 ulAvgBitRate = pStreamInfo->m_ulAvgBitRate;
    UINT32 ulRRBitRate = pStreamInfo->m_ulRtpRRBitRate;
    UINT32 ulRSBitRate = pStreamInfo->m_ulRtpRSBitRate;
    BOOL   bUseRFC1889MinTime = FALSE;

    if (!ulAvgBitRate)
    {
      // We don't know the average bitrate.
      // Make something up
      ulAvgBitRate = 20000;
    }
    else
    {
      UINT32 ulRTCPBw = ulAvgBitRate / 20; // 5% of AvgBitRate

      if ((ulRRBitRate == ulInvalidRate) &&
          (ulRSBitRate != ulInvalidRate) &&
          (ulRTCPBw > ulRSBitRate))
      {
          ulRRBitRate = ulRTCPBw - ulRSBitRate;
      }
      else if ((ulRRBitRate != ulInvalidRate) &&
             (ulRSBitRate == ulInvalidRate) &&
             (ulRTCPBw > ulRRBitRate))
      {
          ulRSBitRate = ulRTCPBw - ulRRBitRate;
      }
    }

    if ((ulRRBitRate == ulInvalidRate) ||
      (ulRSBitRate == ulInvalidRate))
    {
      // If one of the bitrates is still
      // invalid at this point we just
      // default to the RFC 1889 behavior.
      // RS = 1.25% of the average bitrate
      // RR = 3.75% of the average bitrate
      
      bUseRFC1889MinTime = TRUE;
      m_bSendRTCP = TRUE;

      ulRSBitRate = ulAvgBitRate / 80; // 1.25%
      ulRRBitRate = ((ulAvgBitRate / 80) * 3 +
                   ((ulAvgBitRate % 80) * 3) / 80); // 3.75%
    }
    else if (ulRRBitRate == 0)
    {
      // We have been told not
      // to send RTCP reports
      m_bSendRTCP = FALSE;
    }

    if (m_pReportHandler)
    {
      // Get the minimum RTCP report interval
      UINT32 ulMinIntervalMs = (bUseRFC1889MinTime) ? 5000 : 1;

      // Pass the report interval parameters to 
      // the report handler
      m_pReportHandler->SetRTCPIntervalParams(ulRSBitRate, ulRRBitRate,
                                    ulMinIntervalMs);
    }
}

void
RTCPBaseTransport::setSSRC(UINT32 ulSSRC)
{
    m_bSSRCDetermined = TRUE;
    m_ulSSRCDetermined = ulSSRC;
}

void
RTCPBaseTransport::setSessionID(const char* pSessionID)
{
    /* cache the session id for use in retrieving signal bus*/
      HX_RELEASE(m_pSessionId);
    if(pSessionID && (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXBuffer,
                                                      (void**)&m_pSessionId))))
    {
      m_pSessionId->Set((UCHAR*)pSessionID, 
                    strlen(pSessionID)+1);


      IHXQoSSignalSource* pSignalSrc = NULL;

      if (m_pSessionId && SUCCEEDED(m_pContext->QueryInterface(IID_IHXQoSSignalSource,
                                                 (void**) &pSignalSrc)))
      {
          pSignalSrc->GetSignalBus(m_pSessionId, (IHXQoSSignalSourceResponse*)this);
          HX_RELEASE(pSignalSrc);
      }
      else
      {
          m_pSignalBus = NULL;
      }
    }
}

STDMETHODIMP
RTCPBaseTransport::SignalBusReady (HX_RESULT hResult, IHXQoSSignalBus* pBus, 
                           IHXBuffer* pSessionId)
{
    if (FAILED(hResult))
    {
      HX_ASSERT(0);
      return HXR_OK;
    }

    HX_RELEASE(m_pSignalBus);
    m_pSignalBus = pBus;
    m_pSignalBus->AddRef();

    if (m_pDataTransport)
    {
        HX_RELEASE(m_pDataTransport->m_pQoSInfo);
        if (FAILED(m_pSignalBus->QueryInterface(IID_IHXQoSTransportAdaptationInfo,
                                                (void**)&m_pDataTransport->m_pQoSInfo)))
        {
            m_pDataTransport->m_pQoSInfo = NULL;
        }
    }
    else
    {
        HX_ASSERT(0);
    }

    HX_RELEASE(m_pQoSSignal_RR);
    if (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXQoSSignal,
                                          (void**)&m_pQoSSignal_RR)))
    {
      m_pQoSSignal_RR->SetId(MAKE_HX_QOS_SIGNAL_ID(HX_QOS_SIGNAL_LAYER_FRAMING_TRANSPORT,
                                      HX_QOS_SIGNAL_RELEVANCE_METRIC,
                                      HX_QOS_SIGNAL_RTCP_RR));
    }
    else
    {
      HX_ASSERT(0);
      m_pQoSSignal_RR = NULL;
    }

    HX_RELEASE(m_pQoSSignal_RR);
    if (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXQoSSignal,
                                          (void**)&m_pQoSSignal_APP)))
    {
        m_pQoSSignal_APP->SetId(MAKE_HX_QOS_SIGNAL_ID(HX_QOS_SIGNAL_LAYER_FRAMING_TRANSPORT,
                                           HX_QOS_SIGNAL_RELEVANCE_METRIC,
                                           HX_QOS_SIGNAL_COMMON_BUFSTATE));
    }
    else
    {
      HX_ASSERT(0);
      m_pQoSSignal_APP = NULL;
    }

    return HXR_OK;
}

HX_RESULT 
RTCPBaseTransport::SetTSConverter(CHXTimestampConverter::ConversionFactors conversionFactors)
{
    HX_DELETE(m_pTSConverter);

    m_pTSConverter = new CHXTimestampConverter(conversionFactors);

    return m_pTSConverter ? HXR_OK : HXR_OUTOFMEMORY;
}

HX_RESULT
RTCPBaseTransport::startScheduler()
{
    if(!m_bSchedulerStarted && m_bSendRTCP)
    {
      HX_ASSERT(!m_bCallbackPending);
      m_bSchedulerStarted = TRUE;
      if (!m_bMulticast)
      {
          // we wanna send the report right away!
          m_bSendReport = TRUE;
      }
      else
      {    
          if (!m_bCallbackPending)
          {
            scheduleNextReport();
          }
      }
    }

    return HXR_OK;
}

HX_RESULT
RTCPBaseTransport::stopScheduler()
{
    if(m_bCallbackPending)
    {
      HX_ASSERT(m_pScheduler);
      m_pScheduler->Remove(m_reportTimeoutID);
      m_bCallbackPending = FALSE;
    }
    HX_RELEASE(m_pReportCallback);

    return HXR_OK;
}

void
RTCPBaseTransport::scheduleNextReport()
{
    if (m_bSendRTCP)
    {
      HX_ASSERT(!m_bSendReport);
      HX_ASSERT(!m_bCallbackPending);
      HX_ASSERT(m_pReportCallback);
      
      HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
      Timeval tvNow((INT32)rmatv.tv_sec, (INT32)rmatv.tv_usec);
      
      tvNow += Timeval(m_pReportHandler->GetRTCPInterval());
      
      rmatv.tv_sec = tvNow.tv_sec;
      rmatv.tv_usec = tvNow.tv_usec;
      m_reportTimeoutID = 
          m_pScheduler->AbsoluteEnter(m_pReportCallback, rmatv);
      m_bCallbackPending = TRUE;    
    }
}

/*
*   we don't have a table of sender or receivers because we don't yet 
*   support multicast.  i.e. only one sender, one receiver
*/
HX_RESULT
RTCPBaseTransport::handlePacket(IHXBuffer* pBuffer)
{
    // we need to deal with a compund packet
    RTCPUnPacker unpacker;

//{FILE* f1 = ::fopen("c:\\temp\\all.txt", "a+"); ::fprintf(f1, "this: %p RTCPTransport::handlePacket(): ", this);::fclose(f1);}      

    if (HXR_OK != unpacker.UnPack(pBuffer))
    {
      // failed...don't do anything more...still ok to return HXR_OK;
      return HXR_OK;    
    }

    /* update */
    m_pReportHandler->UpdateAvgRTCPSize(pBuffer->GetSize());

    HX_RESULT theErr = HXR_OK;
    RTCPPacket* pPkt = NULL;
    HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
    UINT32 ulNow = rmatv.tv_sec * 1000 + rmatv.tv_usec / 1000;

    // for EOS support
    BOOL bBye = FALSE;
    APPItem* pAppPkt = NULL;

    while (HXR_OK == unpacker.Get(pPkt))
    {
//{FILE* f1 = ::fopen("c:\\temp\\all.txt", "a+"); ::fprintf(f1, "%u\n", pPkt->packet_type);::fclose(f1);}  
        if (m_bIsSource || (m_bSSRCDetermined && m_ulSSRCDetermined == pPkt->sr_ssrc))
        {    
          // deal with it!
          switch(pPkt->packet_type)
          {
              case RTCP_SR:
              {
                DPRINTF(D_INFO, ("RTCP: SenderReport received\n"));

                m_pReportHandler->OnRTCPReceive(pPkt, ulNow);

                m_pDataTransport->handleRTCPSync(NTPTime(pPkt->ntp_sec, 
                                               pPkt->ntp_frac),
                                         pPkt->rtp_ts);
              }
              break;
    
              case RTCP_RR:
              {
                DPRINTF(D_INFO, ("RTCP: ReceiverReport received\n"));

                m_pReportHandler->OnRTCPReceive(pPkt, ulNow);
                
                IHXBuffer* pTmp = NULL;
                if((m_pSignalBus) && SUCCEEDED(m_pCommonClassFactory->
                                         CreateInstance(CLSID_IHXBuffer,
                                                    (void**)&pTmp)))
                {
                    for (UINT32 i = 0; i < pPkt->count; i++)
                    {
                      if (pPkt->rr_data[i].ssrc == m_pReportHandler->GetSSRC())
                      {
                          pTmp->SetSize(sizeof(ReceptionReport));
                          ReceptionReport* pRR = (ReceptionReport*)pTmp->GetBuffer();

                          HX_ASSERT(pRR);

                          //replace the ssrc with the server specific stream number
                          pRR->ssrc     = m_streamNumber;
                          //copy in the remaining receiver report data
                          pRR->fraction = pPkt->rr_data[i].fraction;
                          pRR->lost     = pPkt->rr_data[i].lost;
                          pRR->last_seq = pPkt->rr_data[i].last_seq;
                          pRR->lsr      = (m_pDataTransport) ? 
                            m_pDataTransport->MapLSR(pPkt->rr_data[i].lsr) : 0;
                          pRR->dlsr     = pPkt->rr_data[i].dlsr;

                        if (m_pDataTransport && m_pDataTransport->m_pQoSInfo)
                        {
                            m_pDataTransport->m_pQoSInfo->SetPacketLoss(pRR->lost);
                        }

                          m_pQoSSignal_RR->SetValue(pTmp);
                          m_pSignalBus->Send(m_pQoSSignal_RR);
                      }
                    }
                }
                HX_RELEASE(pTmp);
              }
              break;
    
              case RTCP_SDES:
              {
                DPRINTF(D_INFO, ("RTCP: SDESReport received\n"));
                m_pReportHandler->OnRTCPReceive(pPkt, ulNow);
              }
              break;
    
              case RTCP_BYE:
              {
                DPRINTF(D_INFO, ("RTCP: BYE received\n"));
                m_pReportHandler->OnRTCPReceive(pPkt, ulNow);

                bBye = TRUE;
              }
              break;
    
              case RTCP_APP:
              {
                DPRINTF(D_INFO, ("RTCP: APP received\n"));
                // make sure this APP is from RS.
                // Hmmm...This is a security risk...Anyone can send APP pkt 
                // with "RNWK"...
                if ((0 != strncmp((const char*)pPkt->app_name, "RNWK", 4)) &&
                    (0 != strncmp((const char*)pPkt->app_name, "HELX", 4)))
                {
                    // unknown APP, ignore it.
                    break;
                }

                if (!(pPkt->m_pAPPItems))
                {
                    break;
                }

                HX_ASSERT(1 == pPkt->count);
                pAppPkt = new APPItem();
                    if(!pAppPkt)
                    {
                        theErr = HXR_OUTOFMEMORY;
                        break;
                    }

                if ((pPkt->m_pAPPItems[0]).app_type == APP_BUFINFO)
                {
                    IHXBuffer* pTmp = NULL;
                    if((m_pSignalBus) && SUCCEEDED(m_pCommonClassFactory->
                                           CreateInstance(CLSID_IHXBuffer,
                                                      (void**)&pTmp)))
                    {
    //                  pTmp->Set((UCHAR*)(&pPkt->m_pAPPItems[0]), sizeof(APPItem));
                            pTmp->SetSize(sizeof(BufferMetricsSignal));
                        BufferMetricsSignal* pbufSig = 
                          (BufferMetricsSignal*)pTmp->GetBuffer();

    // The correct mapping from SSRC to stream number relies on the port multiplexing
    // used by the current implementation. If we use SSRC-based multiplexing, we will
    // need to take a look again at this mapping.                        
                            pbufSig->m_ulStreamNumber = m_streamNumber;
                            pbufSig->m_ulLowTimestamp = pPkt->m_pAPPItems[0].lowest_timestamp;
                      pbufSig->m_ulHighTimestamp = pPkt->m_pAPPItems[0].highest_timestamp;
                      pbufSig->m_ulBytes = pPkt->m_pAPPItems[0].bytes_buffered;
                            m_pQoSSignal_APP->SetValue(pTmp);
                      m_pSignalBus->Send(m_pQoSSignal_APP);
                      HX_RELEASE(pTmp);
                    }
                }
                else
                {
                    memcpy(pAppPkt, &pPkt->m_pAPPItems[0], sizeof(APPItem)); /* Flawfinder: ignore */
                }
              }
              break;
    
              default:
              {
                DPRINTF(D_INFO, ("RTCP: Don't know this pkt type\n"));      
              }   
          } 
        }

      // Im responsible of freeing the pkt
      HX_DELETE(pPkt);
    }

    if ((bBye) && (!m_bIsSource))
    {
      HX_ASSERT(m_streamNumber == m_pDataTransport->m_streamNumber);
      RTSPTransportBuffer* pTransportBuffer = 
          m_pDataTransport->getTransportBuffer(m_pDataTransport->m_streamNumber);
      
      if (pTransportBuffer)
      {
          if (pAppPkt && (APP_EOS == pAppPkt->app_type))
          { 
            pTransportBuffer->SetEndPacket(pAppPkt->seq_no,
                                     0,
                                     pAppPkt->packet_sent,
                                     pAppPkt->timestamp);
          }
          else
          {
            pTransportBuffer->InformSourceStopped();
          }
      }
      else
      {
          HX_ASSERT(!"can't find the transport buffer");
          theErr = HXR_FAIL;
      }
    }

    HX_DELETE(pAppPkt);
    return theErr;
}

/*
*   
*
*/
HX_RESULT
RTCPBaseTransport::makeBye(REF(IHXBuffer*) pSendBuf)
{
    HX_ASSERT(m_pDataTransport->m_streamNumber == m_streamNumber);    
    HX_ASSERT(m_bSendBye);

    // consider it sent...
    m_bSendBye = FALSE;
    
    HX_RESULT     theErr = HXR_FAIL;
//    IHXBuffer* pSendBuf = NULL;
    RTCPPacket* pPktSDES = NULL;
    RTCPPacket* pPktBYE  = NULL;
    RTCPPacket* pPktAPP  = NULL;

    HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
    UINT32 ulNow = rmatv.tv_sec*1000 + rmatv.tv_usec/1000;
    Timeval tvNow((INT32) rmatv.tv_sec, (INT32)rmatv.tv_usec);
    
    RTCPPacker    packer;
    
    RTCPPacket* pPktR = new RTCPPacket();
    if(!pPktR)
    {
        theErr = HXR_OUTOFMEMORY;
      goto bail;
    }
    if (m_bIsSource)
    {
      theErr = m_pReportHandler->MakeSR(pPktR, tvNow);
    }
    else
    {
      theErr = m_pReportHandler->MakeRR(pPktR, ulNow);
    }

    if (HXR_OK != theErr)
    {
      // no SR/RR, no RTCP
      goto bail;
    }

    pPktSDES = new RTCPPacket();
    if(!pPktSDES)
    {
        theErr = HXR_OUTOFMEMORY;
      goto bail;
    }
    theErr = m_pReportHandler->MakeSDES(pPktSDES, m_pcCNAME);
    if (HXR_OK != theErr)
    {
      goto bail;
    }

    pPktBYE = new RTCPPacket();
    if(!pPktBYE)
    {
        theErr = HXR_OUTOFMEMORY;
      goto bail;
    }
    theErr = m_pReportHandler->MakeBye(pPktBYE);
    if (HXR_OK != theErr)
    {
      goto bail;  
    }

    // if it is source, we need to make EOS pkt
    if (m_bIsSource)
    {
      pPktAPP = new RTCPPacket();
        if(!pPktAPP)
        {
            theErr = HXR_OUTOFMEMORY;
          goto bail;
        }
      theErr = m_pReportHandler->MakeEOSApp(pPktAPP);
      if (HXR_OK != theErr)
      {
          goto bail;
      }   
    }
    // pack them up!
    theErr = m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                       (void**)&pSendBuf);
    if (HXR_OK != theErr)
    {
      HX_ASSERT(!pSendBuf);
      goto bail;
    }

    packer.Set(pPktR);
    packer.Set(pPktSDES);
    packer.Set(pPktBYE);
    if (m_bIsSource)
    {
      HX_ASSERT(pPktAPP);
      packer.Set(pPktAPP);
    }
    theErr = packer.Pack(pSendBuf);
                                   
    if (HXR_OK != theErr)
    {
      HX_ASSERT(FALSE && "failed to create Report/BYE RTCP pkt");
      goto bail;
    }
bail:
    HX_DELETE(pPktR);
    HX_DELETE(pPktSDES);
    HX_DELETE(pPktBYE);
    HX_DELETE(pPktAPP);
//    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPBaseTransport::makeSenderReport(REF(IHXBuffer*) pSendBuf)
{
    // create SR
    // create SDES
    // create compound RTCP
    // send
    // no reception report on the server

    // if no pkt has been sent, wait!
    HX_ASSERT(m_pDataTransport->m_streamNumber == m_streamNumber);    
    RTSPStreamData* pStreamData = 
          m_pDataTransport->m_pStreamHandler->getStreamData(m_pDataTransport->m_streamNumber);
    
    if(!pStreamData || !pStreamData->m_packetSent)
    {
      return HXR_FAIL;
    }

    HX_ASSERT(pStreamData->m_streamNumber == m_streamNumber);

    HX_RESULT theErr = HXR_FAIL;
//    IHXBuffer* pSendBuf = NULL;
    RTCPPacket* pPktSDES = NULL;

    HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
    UINT32 ulNow = rmatv.tv_sec*1000 + rmatv.tv_usec/1000;
    Timeval tvNow((INT32) rmatv.tv_sec, (INT32)rmatv.tv_usec);

    RTCPPacker packer;

    RTCPPacket* pPktSR = new RTCPPacket();
    if(!pPktSR)
    {
        theErr = HXR_OUTOFMEMORY;
      goto bail;
    }

    theErr = m_pReportHandler->MakeSR(pPktSR, tvNow);
    if (HXR_OK != theErr)
    {
      goto bail;
    }
    
    pPktSDES = new RTCPPacket();
    if(!pPktSDES)
    {
        theErr = HXR_OUTOFMEMORY;
      goto bail;
    }
    theErr = m_pReportHandler->MakeSDES(pPktSDES, m_pcCNAME);
    if (HXR_OK != theErr)
    {
      goto bail;
    }
    
    // pack them up!
    theErr = m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                       (void**)&pSendBuf);
    if (HXR_OK != theErr)
    {
      HX_ASSERT(!pSendBuf);
      goto bail;
    }
                                   
    packer.Set(pPktSR);
    packer.Set(pPktSDES);
    theErr = packer.Pack(pSendBuf);

    if (HXR_OK != theErr)
    {
      HX_ASSERT(FALSE && "failed to create SR/SDES RTCP pkt");
      goto bail;
    }

//    theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
    m_pReportHandler->UpdateAvgRTCPSize(pSendBuf->GetSize());
    m_bSendBye = TRUE;
bail:
    HX_DELETE(pPktSR);
    HX_DELETE(pPktSDES);
//    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPBaseTransport::makeReceiverReport(REF(IHXBuffer*) pSendBuf)
{
    // create RR
    // create SDES
    // create BufferInfo
    // create compund RTCP
    // send

    HX_ASSERT(m_pDataTransport->m_streamNumber == m_streamNumber);    

    HX_RESULT theErr = HXR_FAIL;
//    IHXBuffer* pSendBuf = NULL;
    RTCPPacket* pPktSDES = NULL;
    RTCPPacket* pPktBufInfo = NULL;

    HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
    UINT32 ulNow = rmatv.tv_sec*1000 + rmatv.tv_usec/1000;

    INT64 llLowTS = 0;
    INT64 llHighTS = 0;
    UINT32 ulBytesBuffered = 0;
    BOOL bDone = FALSE;

    RTCPPacker packer;

    RTCPPacket* pPktRR = new RTCPPacket();

    if(!pPktRR)
    {
      theErr = HXR_OUTOFMEMORY;
      goto bail;
    }

    theErr = m_pReportHandler->MakeRR(pPktRR, ulNow);
    if (HXR_OK != theErr)
    {
      goto bail;
    }
    
    pPktSDES = new RTCPPacket();
    if(!pPktSDES)
    {
      theErr = HXR_OUTOFMEMORY;
      goto bail;
    }
    theErr = m_pReportHandler->MakeSDES(pPktSDES, m_pcCNAME);
    if (HXR_OK != theErr)
    {
      goto bail;
    }

    pPktBufInfo = new RTCPPacket();
    
    // Get buffer info from m_pSrcBufferStats
      
    if (!m_pSrcBufferStats || !m_pReportHandler ||
      HXR_OK != m_pSrcBufferStats->GetTotalBuffering(m_streamNumber, 
                                           llLowTS, llHighTS,
                                           ulBytesBuffered,
                                           bDone) ||
      HXR_OK != m_pReportHandler->MakeBufInfoApp(pPktBufInfo, 
                                       INT64_TO_UINT32(llLowTS), 
                                       INT64_TO_UINT32(llHighTS),
                                       ulBytesBuffered))
    {
      // If we failed for some reason,
      // just delete the packet so that
      // it doesn't get included in the
      // compound packet. Failing to
      // report buffer info is not a 
      // critical error
      HX_DELETE(pPktBufInfo);
    }

    // pack them up!
    theErr = m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                       (void**)&pSendBuf);
    if (HXR_OK != theErr)
    {
      HX_ASSERT(!pSendBuf);
      goto bail;
    }

    packer.Set(pPktRR);
    packer.Set(pPktSDES);

    if (pPktBufInfo)
    {
      packer.Set(pPktBufInfo);
    }

    theErr = packer.Pack(pSendBuf);
    
    if (HXR_OK != theErr)
    {
      HX_ASSERT(FALSE && "failed to create SR/SDES RTCP pkt");
      goto bail;
    }

//    theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
    m_pReportHandler->UpdateAvgRTCPSize(pSendBuf->GetSize());
    m_bSendBye = TRUE;

bail:
    HX_DELETE(pPktRR);
    HX_DELETE(pPktSDES);
    HX_DELETE(pPktBufInfo);
//    HX_RELEASE(pSendBuf);
    return theErr;
}

RTCPBaseTransport::ReportCallback::ReportCallback(RTCPBaseTransport* pTransport):
    m_pTransport(pTransport),
    m_lReportRefCount(0)
{
    if(m_pTransport)
    {
      m_pTransport->AddRef();
    }
}

RTCPBaseTransport::ReportCallback::~ReportCallback()
{
    HX_RELEASE(m_pTransport);
}

STDMETHODIMP
RTCPBaseTransport::ReportCallback::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXCallback))
    {
        AddRef();
        *ppvObj = (IHXCallback*)this;
        return HXR_OK;
    }
    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP_(UINT32)
RTCPBaseTransport::ReportCallback::AddRef()
{
    return InterlockedIncrement(&m_lReportRefCount);
}

STDMETHODIMP_(UINT32)
RTCPBaseTransport::ReportCallback::Release()
{
    if(InterlockedDecrement(&m_lReportRefCount) > 0)
    {
      return m_lReportRefCount;
    }
    delete this;
    return 0;
}

STDMETHODIMP
RTCPBaseTransport::ReportCallback::Func()
{
    HX_ASSERT(!m_pTransport->m_bSendReport);
    HX_ASSERT(m_pTransport->m_bCallbackPending);
    m_pTransport->m_bCallbackPending = FALSE;

    if (m_pTransport->m_bSendRTCP)
    {
      m_pTransport->m_bSendReport = TRUE;
    }

    return HXR_OK;
}

/*
 *  RTCP UDP
 */

RTCPUDPTransport::RTCPUDPTransport(BOOL bIsSource)
    : RTCPBaseTransport(bIsSource)
    , m_pUDPSocket(NULL)
    , m_foreignAddr(0)
    , m_foreignPort(0)
    , m_ulCurrentMulticastAddress(0)
    , m_ulCurrentMulticastPort(0)
    , m_pMCastUDPSocket(NULL)
{
}

RTCPUDPTransport::~RTCPUDPTransport()
{
    Done();
}

void
RTCPUDPTransport::Done()
{    
    if (m_bSendBye)
    {
      sendBye();
    } 

    m_keepAlive.reset();

    if (m_pMCastUDPSocket)
    {
        m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    HX_RELEASE(m_pMCastUDPSocket);
    HX_RELEASE(m_pUDPSocket);
    HX_RELEASE(m_pDataTransport);
    RTCPBaseTransport::Done();
}

RTSPTransportTypeEnum
RTCPUDPTransport::tag()
{
    return RTSP_TR_RTCP;
}

HX_RESULT
RTCPUDPTransport::init(IUnknown* pContext,
                IHXUDPSocket* pSocket,
                RTPUDPTransport* pDataTransport,
                IHXRTSPTransportResponse* pResp,
                UINT16 streamNumber)                
{

    m_pUDPSocket = pSocket;
    m_pUDPSocket->AddRef();
    m_pDataTransport = pDataTransport;
    m_pDataTransport->AddRef();
    m_pResp = pResp;
    pResp->AddRef();

    m_streamNumber = streamNumber;

    /* Set DiffServ Code Point */
    IHXSetSocketOption* pOpt = NULL;
    if (SUCCEEDED(m_pUDPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
    {
      IHXQoSDiffServConfigurator* pCfg = NULL;
      if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
      {
          pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_CONTROL);
          pCfg->Release();
          pCfg = NULL;
      }
      
      pOpt->Release();
      pOpt = NULL;
    }
    
    HX_RESULT hresult = Init(pContext);
    if(HXR_OK != hresult)
    {
      return hresult;
    }

    RTCPBaseTransport::init();
    
    return HXR_OK;
}

void
RTCPUDPTransport::setForeignAddress(UINT32 foreignAddr, UINT16 foreignPort)
{
    m_foreignAddr = foreignAddr;
    m_foreignPort = foreignPort;

    UINT32 natTimeout = GetNATTimeout(m_pContext);

    if (!m_bIsSource && natTimeout)
    {
      // Initialize keepalive object
      m_keepAlive.Init(m_pScheduler, natTimeout, new KeepAliveCB(this));
    
      // Do initial "poke" through the NAT
      onNATKeepAlive();
    }
}

HX_RESULT RTCPUDPTransport::handlePacket(IHXBuffer* pBuffer)
{
    m_keepAlive.OnActivity();

    return RTCPBaseTransport::handlePacket(pBuffer);
}

void 
RTCPUDPTransport::JoinMulticast(UINT32 ulAddress, UINT32 ulPort, IHXUDPSocket* pUDP)
{
    if (m_ulCurrentMulticastAddress)
    {
      m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    else
    {
      m_pMCastUDPSocket = pUDP;
        m_pMCastUDPSocket->AddRef();
    }

    m_pMCastUDPSocket->JoinMulticastGroup(ulAddress, HXR_INADDR_ANY);
    m_bMulticast = TRUE;
    m_ulCurrentMulticastAddress = ulAddress;
    m_ulCurrentMulticastPort = ulPort;

    if (m_pStreamHandler)
    {
      RTSPStreamData* pStreamData = m_pStreamHandler->firstStreamData();

      ASSERT(pStreamData);

      while(pStreamData)
      {
          pStreamData->m_pTransportBuffer->SetMulticast();
          pStreamData = m_pStreamHandler->nextStreamData();
      }
    }

    return;
}

HX_RESULT RTCPUDPTransport::onNATKeepAlive()
{
    DPRINTF(D_INFO, ("RTCP: onNATKeepAlive()\n"));

    if (m_bSendRTCP)
    {
      // Send an early receiver report to keep
      // the NAT port open
      sendReceiverReport();
    }

    return HXR_OK;
}

HX_RESULT
RTCPUDPTransport::streamDone(UINT16 streamNumber)
{
    HX_ASSERT(streamNumber == m_streamNumber);
    HX_ASSERT(streamNumber == m_pDataTransport->m_streamNumber);

    // this will be called from RTPUDPTransport::streamDone();
    if (m_bSendBye)
    {
      sendBye();
    } 
    return HXR_OK;
}

/*
 *  We don't really konw what this RTCP pkt is...Simply reflect.
 */
HX_RESULT
RTCPUDPTransport::reflectRTCP(IHXBuffer* pSendBuf)
{
    HX_ASSERT(pSendBuf);
    return m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);    
}

HX_RESULT
RTCPUDPTransport::sendSenderReport()
{
    HX_ASSERT(m_bIsSource);
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeSenderReport(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPUDPTransport::sendReceiverReport()
{
    HX_ASSERT(!m_bIsSource);
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeReceiverReport(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPUDPTransport::sendBye()
{
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeBye(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      if (m_bIsSource)
      {
          // we don't want this to get lost since a client will request for 
          // TEARDOWN upon a reception of this report.
          for (UINT32 i = 0; i < 5 && HXR_OK == theErr; i++)
          {
            theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
          }     
      }     
      else
      {
          theErr = m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuf);
      }
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}

RTCPUDPTransport::KeepAliveCB::KeepAliveCB(RTCPUDPTransport* pTransport):
    m_pTransport(pTransport),
    m_lRefCount(0)
{
    if(m_pTransport)
    {
      m_pTransport->AddRef();
    }
}

RTCPUDPTransport::KeepAliveCB::~KeepAliveCB()
{
    HX_RELEASE(m_pTransport);
}

STDMETHODIMP
RTCPUDPTransport::KeepAliveCB::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXCallback))
    {
        AddRef();
        *ppvObj = (IHXCallback*)this;
        return HXR_OK;
    }
    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP_(UINT32)
RTCPUDPTransport::KeepAliveCB::AddRef()
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
RTCPUDPTransport::KeepAliveCB::Release()
{
    if(InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    delete this;
    return 0;
}

STDMETHODIMP
RTCPUDPTransport::KeepAliveCB::Func()
{
    if (m_pTransport)
    {
      m_pTransport->onNATKeepAlive();
    }
    return HXR_OK;
}

/*
 *  RTCP TCP
 */

RTCPTCPTransport::RTCPTCPTransport(BOOL bIsSource)
    : RTCPBaseTransport(bIsSource)
    , m_pTCPSocket(NULL)
    , m_tcpInterleave((INT8)0xFF)
{
}

RTCPTCPTransport::~RTCPTCPTransport()
{
    Done();
}

void
RTCPTCPTransport::Done()
{    
    if (m_bSendBye)
    {
      sendBye();
    } 

    HX_RELEASE(m_pTCPSocket);
    HX_RELEASE(m_pDataTransport);
    RTCPBaseTransport::Done();
}

RTSPTransportTypeEnum
RTCPTCPTransport::tag()
{
    return RTSP_TR_RTCP;
}

HX_RESULT
RTCPTCPTransport::init(IUnknown* pContext,
                IHXTCPSocket* pSocket,
                RTPTCPTransport* pDataTransport,
                IHXRTSPTransportResponse* pResp,
                UINT16 streamNumber)                
{
    m_pTCPSocket = pSocket;
    m_pTCPSocket->AddRef();
    m_pDataTransport = pDataTransport;
    m_pDataTransport->AddRef();
    
    m_pResp = pResp;
    m_pResp->AddRef();

    m_streamNumber = streamNumber;

    /* Set DiffServ Code Point */
    IHXSetSocketOption* pOpt = NULL;
    if (SUCCEEDED(m_pTCPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
    {
      IHXQoSDiffServConfigurator* pCfg = NULL;
      if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
      {
          pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_CONTROL);
          pCfg->Release();
          pCfg = NULL;
      }
      
      pOpt->Release();
      pOpt = NULL;
    }
    
    HX_RESULT hresult = Init(pContext);
    if(HXR_OK != hresult)
    {
      return hresult;
    }

    RTCPBaseTransport::init();
    
    return HXR_OK;
}


HX_RESULT
RTCPTCPTransport::streamDone(UINT16 streamNumber)
{
    HX_ASSERT(streamNumber == m_streamNumber);
    HX_ASSERT(streamNumber == m_pDataTransport->m_streamNumber);

    // this will be called from RTPUDPTransport::streamDone();
    if (m_bSendBye)
    {
      sendBye();
    } 
    return HXR_OK;
}

HX_RESULT
RTCPTCPTransport::reflectRTCP(IHXBuffer* pSendBuf)
{
    HX_ASSERT(pSendBuf);
    HX_RESULT theErr = writePacket(pSendBuf);    
    if (theErr)
    {
      m_pResp->OnProtocolError(HXR_NET_SOCKET_INVALID);     
    }

    return theErr;
}

HX_RESULT
RTCPTCPTransport::sendSenderReport()
{
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeSenderReport(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      theErr = writePacket(pSendBuf);
      if(theErr)
      {
          m_pResp->OnProtocolError(HXR_NET_SOCKET_INVALID);
      }     
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPTCPTransport::sendReceiverReport()
{
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeReceiverReport(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      theErr = writePacket(pSendBuf);
      if(theErr)
      {
          m_pResp->OnProtocolError(HXR_NET_SOCKET_INVALID);
      }     
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}

HX_RESULT
RTCPTCPTransport::sendBye()
{
    HX_RESULT theErr;
    IHXBuffer* pSendBuf = NULL;

    theErr = makeBye(pSendBuf);
    if (HXR_OK == theErr)
    {
      HX_ASSERT(pSendBuf);
      theErr = writePacket(pSendBuf);
      /*
       * this write will fail if a client initiated a teardown before the end 
       * of a clip because by the time we send BYE, a sock is gone!  So, don't
       * worry about return code here.
       */
    } 

    HX_RELEASE(pSendBuf);
    return theErr;
}


HX_RESULT
RTCPTCPTransport::writePacket(IHXBuffer* pBuf)
{
    if (!m_pTCPSocket)
        return HXR_FAIL;

    // need to put $\000[datalen] in front of packet data    
    HX_ASSERT(pBuf);
    UINT32 dataLen = pBuf->GetSize();

    if(dataLen > 0xffff)
    {
      return HXR_FAIL;
    }

    //XXXTDM: always true, m_tcpInteleave is signed (why?)
    //HX_ASSERT(0xFF != m_tcpInterleave);

    IHXBuffer* pHeader = NULL;
    m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, 
                                          (void**)&pHeader);
    BYTE* pHeaderData;

    if(!pHeader)
    {
        return HXR_OUTOFMEMORY;
    }
    pHeader->SetSize(4);
    pHeaderData = pHeader->GetBuffer();

    pHeaderData[0] = '$';
    pHeaderData[1] = m_tcpInterleave;
    putshort(&pHeaderData[2], (UINT16)dataLen);

    HX_RESULT rc;
    rc = m_pTCPSocket->Write(pHeader);
    if (SUCCEEDED(rc))
    {
        rc = m_pTCPSocket->Write(pBuf);
    }
    pHeader->Release();

    return rc;
}

Generated by  Doxygen 1.6.0   Back to index