Logo Search packages:      
Sourcecode: helix-player version File versions

ppffsrc.cpp

/* ***** BEGIN LICENSE BLOCK *****
 * Source last modified: $Id: ppffsrc.cpp,v 1.3.30.1 2004/07/19 21:04:16 hubbe Exp $
 * 
 * Portions Copyright (c) 1995-2004 RealNetworks, Inc. All Rights Reserved.
 * 
 * The contents of this file, and the files included with this file,
 * are subject to the current version of the RealNetworks Public
 * Source License (the "RPSL") available at
 * http://www.helixcommunity.org/content/rpsl unless you have licensed
 * the file under the current version of the RealNetworks Community
 * Source License (the "RCSL") available at
 * http://www.helixcommunity.org/content/rcsl, in which case the RCSL
 * will apply. You may also obtain the license terms directly from
 * RealNetworks.  You may not use this file except in compliance with
 * the RPSL or, if you have a valid RCSL with RealNetworks applicable
 * to this file, the RCSL.  Please see the applicable RPSL or RCSL for
 * the rights, obligations and limitations governing use of the
 * contents of the file.
 * 
 * Alternatively, the contents of this file may be used under the
 * terms of the GNU General Public License Version 2 or later (the
 * "GPL") in which case the provisions of the GPL are applicable
 * instead of those above. If you wish to allow use of your version of
 * this file only under the terms of the GPL, and not to allow others
 * to use your version of this file under the terms of either the RPSL
 * or RCSL, indicate your decision by deleting the provisions above
 * and replace them with the notice and other provisions required by
 * the GPL. If you do not delete the provisions above, a recipient may
 * use your version of this file under the terms of any one of the
 * RPSL, the RCSL or the GPL.
 * 
 * This file is part of the Helix DNA Technology. RealNetworks is the
 * developer of the Original Code and owns the copyrights in the
 * portions it created.
 * 
 * This file, and the files included with this file, is distributed
 * and made available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY
 * KIND, EITHER EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS
 * ALL SUCH WARRANTIES, INCLUDING WITHOUT LIMITATION, ANY WARRANTIES
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, QUIET
 * ENJOYMENT OR NON-INFRINGEMENT.
 * 
 * Technology Compatibility Kit Test Suite(s) Location:
 *    http://www.helixcommunity.org/content/tck
 * 
 * Contributor(s):
 * 
 * ***** END LICENSE BLOCK ***** */

/////////////////////////////////////////////////////////////////////////////
//  Defines
//
// #define _SYNC_TRACE

#define MAX_BUFFERED_STREAM_PACKETS 30000
#define UNLOCK_THRESHOLD_LEVEL            15000

#define _USE_MASTER_STREAM_SYNC_SCHEME

#define AUDIO_MIME_PREFIX     "audio/"
#define AUDIO_MIME_PREFIX_SIZE      (sizeof(AUDIO_MIME_PREFIX) - 1)

#define VIDEO_MIME_PREFIX     "video/"
#define VIDEO_MIME_PREFIX_SIZE      (sizeof(VIDEO_MIME_PREFIX) - 1)

#define STARTING_HX_TIMESTAMP 100000          // in ms


/////////////////////////////////////////////////////////////////////////////
//  Includes
//
#include "hxtypes.h"
#include "hxcom.h"
#include "hxcomm.h"
#include "hxprefs.h"
#include "hxengin.h"
#include "ihxpckts.h"
#include "hxerror.h"
#include "rtptypes.h"
#include "rtpwrap.h"
#include "hxrand.h"
#include "hxslist.h"
#include "hxstring.h"
#include "hxstrutl.h"
#include "chxpckts.h"
#include "hxbuffer.h"
#include "hxmime.h"
#include "tconverter.h"
#include "asmrulep.h"
#include "hxtick.h"
#include "netbyte.h"
#include "rtcpmisc.h"
#include "hxmon.h"
//#include "hxengin.h"
#include "hxmarsh.h"
#include "mimechk.h"

#include "scalmres.h"   // in coreres...

#include "carray.h"

#include "ppffplin.h"
#include "ppbin.h"
#include "ppffsrc.h"


#include "hxheap.h"
#ifdef _DEBUG
#undef HX_THIS_FILE           
static char HX_THIS_FILE[] = __FILE__;
#endif

#ifdef _SYNC_TRACE
FILE* m_sfile = NULL;
#endif      // _SYNC_TRACE


UINT32 GetCurrentSchedulerTimeInMsec(IHXScheduler* pScheduler) 
{ 
    if (!pScheduler)
    {
      HX_ASSERT(FALSE);
      return 0;
    }
    
    HXTimeval lTime = pScheduler->GetCurrentSchedulerTime(); 
    return lTime.tv_sec * 1000 + lTime.tv_usec / 1000; 
}


/******************************************************************
*  CPurePlaySource::CPurePlaySource
*
*  Constuctor finds the RTP type and sample size/bitrate for
*  timestamp conversion.  Also acts as packet queue
*/

CPurePlaySource::CPurePlaySource
(
    CPurePlayFileFormat*    pPlugin,
    IHXValues*              pHeader,
    UINT16            iStream,
    UINT32            ulRTPFactor,
    UINT32            ulHXFactor,
    BOOL              bIsRMSource
 )
 : m_lRefCount(0)
 , m_pPlugin(pPlugin)
 , m_pHeader(pHeader)
 , m_iStream(iStream)
 , m_pNet(NULL)
 , m_pResolver(NULL)
 , m_pRTPSock(NULL)
 , m_pRTCPSock(NULL)
 , m_pRTPResp(NULL)
 , m_pRTCPResp(NULL)
 , m_chRTPPayload(-1)
 , m_chTTL(DEFAULT_TTL)
 , m_uMultiPort(0)
 , m_ulMultiAddr(0)
 , m_ulInterfaceAddr(0)
 , m_pRules(NULL)
 , m_cRules(0)
 , m_bInitialPacket(TRUE)
 , m_ulRTPFactor(ulRTPFactor)
 , m_ulHXFactor(ulHXFactor)
 , m_ulLost(0)
 , m_bBeginPending(FALSE)
 , m_lHXOffset(0)
 , m_ulClipBandwidthID(0)
 , m_ulBandwidth(64000)
 , m_hStatusCallbackID(0)
 , m_ulThisSrcID(0)
 , m_pRTCPInterval(NULL)
 , m_pCName(NULL)
 , m_pName(NULL)
 , m_pTool(NULL)
 , m_pEmail(NULL)
 , m_bClosed(FALSE)
 , m_algorithm(ALGORITHM_B)
 , m_pByePkt(NULL)
 , m_bRTCPSent(FALSE)
 , m_ulBeginClockTick(0)
 , m_bTimeStampDelivery(FALSE)
 , m_bIsRMSource(bIsRMSource)
 , m_bIsEncryptedSource(FALSE)
 , m_ulStartedSourceCount(0)
 , m_ulHXMasterTime(0)
 , m_lHXOffsetToMaster(0)
 , m_bSyncDistributed(FALSE)
 , m_ulHXAnchorTime(0)
 , m_ulNTPHXTime(0)
 , m_bSyncAnchorDistributed(FALSE)
 , m_ulStartTime(0)
 , m_bStartTimeDistributed(FALSE)
 , m_StreamType(CStream::STREAM_TYPE_OTHER)
{
    IHXBuffer* pBuffer = NULL;
    HX_RESULT retVal = HXR_OK;

    HX_ASSERT(m_pHeader != NULL);
    HX_ASSERT(m_pPlugin != NULL);
    HX_ASSERT(iStream >= 0);
    HX_ASSERT(ulRTPFactor >= 0);
    HX_ASSERT(ulHXFactor >= 0);
  

#ifdef XXXGo_DEBUG
    static INT32 lCount = 1;
    char cFileName[30]; /* Flawfinder: ignore */
    memset(cFileName, 0, 30);
    SafeSprintf(cFileName, 30, "c:\\temp\\fflog%d.txt", lCount);
    m_pFFLog = fopen(cFileName, "wt");


    memset(cFileName, 0, 30);
    SafeSprintf(cFileName, 30, "c:\\temp\\ppff%d.txt", lCount++);
    m_pLogFile= fopen(cFileName, "wt");
    int lByeWritten = fprintf(m_pLogFile, "File created\n");
#endif                                  
    
    m_pHeader->AddRef();
    m_pPlugin->AddRef();

    retVal = pHeader->GetPropertyCString("MimeType", pBuffer);
    
    // Determine the stream type
    if (SUCCEEDED(retVal))
    {
      const char* pMimeType = (const char*) pBuffer->GetBuffer();

      if (m_bIsRMSource)
      {
          CHXString mimeTypeString;

          mimeTypeString = pMimeType;
          m_bIsEncryptedSource = decryptMimeType(mimeTypeString);

          if (!hxMimeRootCmp(pMimeType, REALAUDIO_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALAUDIO_MULTIRATE_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALAUDIO_MULTIRATE_LIVE_MIME_TYPE))
          {
            m_StreamType = CStream::STREAM_TYPE_AUDIO;
          }
          else if (!hxMimeRootCmp(pMimeType, REALVIDEO_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALVIDEO_MULTIRATE_MIME_TYPE))
          {
            m_StreamType = CStream::STREAM_TYPE_VIDEO;
          }
          else if (!hxMimeRootCmp(pMimeType, IMAGEMAP_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALIMAGEMAP_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALIMAGEMAP_MULTIRATE_MIME_TYPE))
          {
            m_StreamType = CStream::STREAM_TYPE_IMGMAP;
          }
          else if (!hxMimeRootCmp(pMimeType, SYNCMM_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALEVENT_MIME_TYPE) ||
            !hxMimeRootCmp(pMimeType, REALEVENT_MULTIRATE_MIME_TYPE))
          {
            m_StreamType = CStream::STREAM_TYPE_EVENT;
          }
          else
          {
            CStream::STREAM_TYPE_OTHER;
          }
      }
      else
      {
          if ((strlen(pMimeType) >= AUDIO_MIME_PREFIX_SIZE) &&
            (strncasecmp(pMimeType, 
                       AUDIO_MIME_PREFIX,
                       AUDIO_MIME_PREFIX_SIZE) == 0))
          {
            m_StreamType = CStream::STREAM_TYPE_AUDIO;
          }
          else if ((strlen(pMimeType) >= VIDEO_MIME_PREFIX_SIZE) &&
                 (strncasecmp(pMimeType, 
                          VIDEO_MIME_PREFIX,
                          VIDEO_MIME_PREFIX_SIZE) == 0))
          {
            m_StreamType = CStream::STREAM_TYPE_VIDEO;
          }
          else
          {
            CStream::STREAM_TYPE_OTHER;
          }
      }
    }

#ifdef _SYNC_TRACE
    if (!m_sfile)
    {
      m_sfile = fopen("c:\\live_s.txt", "wt");
    }
#endif      // _SYNC_TRACE
    
    HX_RELEASE(pBuffer);
 }
 
 CPurePlaySource::~CPurePlaySource()
{

    HX_VECTOR_DELETE(m_pRules);
    HX_DELETE(m_pRTCPInterval);


#ifdef _SYNC_TRACE
    if (m_sfile)
    {
      fclose(m_sfile);
      m_sfile = NULL;
    }
#endif      // _SYNC_TRACE

#ifdef XXXGo_DEBUG
    if (m_pLogFile)
    {
      fprintf(m_pLogFile, "~CPurePlaySource()\n");
        fclose(m_pLogFile);
        m_pLogFile = NULL;
    }
    if (m_pFFLog)
    {
      fclose(m_pFFLog);
      m_pFFLog = NULL;
    }
#endif        
}
 
 // *** IUnknown methods ***
 /////////////////////////////////////////////////////////////////////////
 //  Method:
 //   IUnknown::QueryInterface
 //  Purpose:
 //   Implement this to export the interfaces supported by your 
 //   object.
 //
 STDMETHODIMP CPurePlaySource::QueryInterface(REFIID riid, void** ppvObj)
 {
     if (IsEqualIID(riid, IID_IUnknown))
     {
       AddRef();
       *ppvObj = this;
       return HXR_OK;
     }
     else if(IsEqualIID(riid, IID_IHXResolverResponse))
     {
       AddRef();
       *ppvObj = (IHXResolverResponse*)this;
       return HXR_OK;
     }
     else if (IsEqualIID(riid, IID_IHXTransportSyncServer))
     {
       if (m_pPlugin)
       {
           return m_pPlugin->QueryInterface(riid, ppvObj);
       }
     }
     
     *ppvObj = NULL;
     return HXR_NOINTERFACE;
 }
 
 /////////////////////////////////////////////////////////////////////////
 //  Method:
 //   IUnknown::AddRef
 //  Purpose:
 //   Everyone usually implements this the same... feel free to use
 //   this implementation.
 //
 STDMETHODIMP_(ULONG32) CPurePlaySource::AddRef()
 {
     return InterlockedIncrement(&m_lRefCount);
 }
 
 /////////////////////////////////////////////////////////////////////////
 //  Method:
 //   IUnknown::Release
 //  Purpose:
 //   Everyone usually implements this the same... feel free to use
 //   this implementation.
 //
 STDMETHODIMP_(ULONG32) CPurePlaySource::Release()
 {
     if (InterlockedDecrement(&m_lRefCount) > 0)
     {
       return m_lRefCount;
     }
     
     delete this;
     return 0;
 }


HX_RESULT
CPurePlaySource::Init(IUnknown* pContext,
                  const char* pMultiAddr,
                  UINT32 ulInterfaceAddr,
                  UINT16 uMultiPort)
{
    HX_ASSERT(pContext != NULL);
    pContext->AddRef();
    
    pContext->QueryInterface(IID_IHXNetworkServices, (void**)&m_pNet);
    HX_ASSERT(m_pNet != NULL);
    
    pContext->QueryInterface(IID_IHXScheduler,  (void**)&m_pScheduler);
    HX_ASSERT(m_pScheduler != NULL);

    UINT32  ulBitRate = 0;
    if (HXR_OK == m_pHeader->GetPropertyULONG32("AvgBitRate", ulBitRate))
    {
      if (0 == ulBitRate)
      {
          // since this # is used to devide something later, can't be 0
          m_ulBandwidth = 1;
      }
      else
      {     
          m_ulBandwidth = ulBitRate;
      }
    }
    else
    {
      // takes a default
    }

    ULONG32 ulPayload;
    if (HXR_OK == m_pHeader->GetPropertyULONG32(PROP_RTP_PAYLOAD, ulPayload))
    {
      m_chRTPPayload = (INT8)ulPayload;
    }
    else
    {
      m_pPlugin->ReportError(IDS_ERR_SM_UNEXPECTEDPAYLOAD, HXLOG_ERR, HXR_FAIL);
      return HXR_FAIL;
    }


    UINT32 ulTTL = 0;
    if (HXR_OK == m_pHeader->GetPropertyULONG32(PROP_MULTI_TTL, ulTTL))
    {
      m_chTTL = (INT8)ulTTL;
    }
    else
    {
      // takes default
    }
    
    IHXPreferences*     pPrefs = NULL;
    IHXBuffer*          pBandwidth = NULL;
    pContext->QueryInterface(IID_IHXPreferences, (void**)&pPrefs);
    HX_ASSERT(pPrefs != NULL);
    
    HX_RELEASE(pContext);

    // take a smaller of two bandwidth info for RTCP interval
    // we don't waste any BW!
    if (HXR_OK == pPrefs->ReadPref("Bandwidth", pBandwidth))
    {
      pBandwidth->AddRef();
      UINT32 ulBW = atol((char*)pBandwidth->GetBuffer());
      pBandwidth->Release();
      
      if (ulBW < m_ulBandwidth)
      {
          m_pRTCPInterval = new CRTCPInterval(ulBW);
      }
      else
      {
          m_pRTCPInterval = new CRTCPInterval(m_ulBandwidth);     
      }
    }
    else
    {
      // set up RTCP Interval
      m_pRTCPInterval = new CRTCPInterval(m_ulBandwidth);   
    }

    HX_RELEASE(pPrefs);
    
    /*
     * Deal with ASMRuleBook
     */
    /*
    * Get the dependencies from the rule book so we can tell when live
    * streams are sync'ed to the keyframe
    */
    
    IHXBuffer*    pRuleBuf = NULL;
    
    if (HXR_OK != m_pHeader->GetPropertyCString("ASMRuleBook", pRuleBuf))
    {
      m_pRules = new RuleInfo[1];
      m_cRules = 1;
      m_bTimeStampDelivery = TRUE;
    }
    else
    {
      ASMRuleBook rules((char*)pRuleBuf->GetBuffer());

      m_cRules = rules.GetNumRules();

#if 0
#ifdef XXXGo_DEBUG
      // just for debug, see how many threshold exist
      UINT32 ulNumThreshold = 0;
      float pThreshold[128];
      memset (pThreshold, 0xffffffff, sizeof(float) * 128);
      GetThresholdInfo(rules, pThreshold, ulNumThreshold);


      for (int j = 0; j < (int)ulNumThreshold; j++)
      {
          fprintf(m_pFFLog, "%.2f ", pThreshold[j]);
      
      }
        fprintf (m_pFFLog, "\n");
#endif

      // get subscriptions for this bandwidth
      BOOL bSubInfo[128];
      IHXValues* pValues = new CHXHeader();
      pValues->AddRef();

      UINT8 pBW[128];
      sprintf((char*)pBW, "%ld", m_ulBandwidth); /* Flawfinder: ignore */
      IHXBuffer* pBandwidth = new CHXBuffer();
      pBandwidth->AddRef();
      pBandwidth->Set(pBW, strlen((char*)pBW) + 1);
      
      pValues->SetPropertyCString("Bandwidth", pBandwidth);
      rules.GetSubscription(bSubInfo, pValues);

      for (int y = 0; y < (int)m_cRules; y++)
      {
#ifdef XXXGo_DEBUG      
          fprintf(m_pFFLog, "  %d", bSubInfo[y]);
#endif
          if (TRUE == bSubInfo[y])
          {
            // yth rule number needs to be index into this socket 
          }
      }
#ifdef XXXGo_DEBUG
      fprintf(m_pFFLog, "\n");
#endif      
      
      HX_RELEASE(pValues);
      HX_RELEASE(pBandwidth);
#endif // #if 0


      // get subscriptions for this bandwidth
      BOOL bSubInfo[128];
      IHXValues* pValues = new CHXHeader();
      pValues->AddRef();

      UINT8 pBW[128];
      sprintf((char*)pBW, "%ld", m_ulBandwidth); /* Flawfinder: ignore */
      IHXBuffer* pBandwidth = new CHXBuffer();
      pBandwidth->AddRef();
      pBandwidth->Set(pBW, strlen((char*)pBW) + 1);
      
      pValues->SetPropertyCString("Bandwidth", pBandwidth);
      rules.GetSubscription(bSubInfo, pValues);

      HX_RELEASE(pBandwidth);
      HX_RELEASE(pValues);
      
      /*
       *  now get Dependencies and figure out TimeStampDelivery stream
       */
      IHXValues*  pRuleProps  = NULL;
      IHXBuffer*  pBuffer         = NULL;
      UINT16            iRule;      
      m_pRules = new RuleInfo[m_cRules];
      
      for (iRule = 0; iRule < m_cRules; iRule++)
      {
          rules.GetProperties(iRule, pRuleProps);

          pBuffer = 0;
          if (SUCCEEDED(pRuleProps->GetPropertyCString("OnDepend", pBuffer)))
          {
            UINT16 *pOnDeps = new UINT16[pBuffer->GetSize()];
            memset(pOnDeps, 0, sizeof(UINT16)*pBuffer->GetSize());
            UINT16 ulNum = 0;
            const UINT8*  pTemp = pBuffer->GetBuffer();
            
            while (*pTemp)
            {
                if (*pTemp == ',')
                {
                  ulNum++;
                  pOnDeps[ulNum] = 0;
                }
                else
                {
                  if ((*pTemp >= '0') && (*pTemp <= '9'))
                  {
                      pOnDeps[ulNum] *= 10;
                      pOnDeps[ulNum] += *pTemp - '0';
                  }
                }
                
                pTemp++;
            }
            ulNum++;
            
            m_pRules[iRule].m_pOnDepends = new UINT16[ulNum + 1];
            memcpy(m_pRules[iRule].m_pOnDepends, pOnDeps, /* Flawfinder: ignore */
                (ulNum + 1) * sizeof(UINT16));
            
            m_pRules[iRule].m_pOnDepends[ulNum] = PP_DEPEND_END;

            HX_RELEASE(pBuffer);
            HX_VECTOR_DELETE(pOnDeps);
          }

          if (TRUE == bSubInfo[iRule] && !m_bTimeStampDelivery)
          {       
            // don't have to do this twice!
            HX_ASSERT(pRuleProps);
            if (SUCCEEDED(pRuleProps->GetPropertyCString("TimeStampDelivery", pBuffer)))
            {
                if ((pBuffer->GetBuffer()[0] == 'T') ||
                  (pBuffer->GetBuffer()[0] == 't'))
                {
                  m_bTimeStampDelivery = TRUE;
                }

                HX_RELEASE(pBuffer);
            }
          }

          HX_RELEASE(pRuleProps);       
      }

      pRuleBuf->Release();
    }
      
    if (!m_bIsRMSource)
    {
      // Treat it as time stamp delivered
      m_bTimeStampDelivery = TRUE;

      // add 6 seconds preroll if it is not in a header already because
      // SDPs from non-rmserver will not have this.
      UINT32 ulVal = 0;
      if (HXR_OK != m_pHeader->GetPropertyULONG32("Preroll", ulVal))
      {
          m_pHeader->SetPropertyULONG32("Preroll", PP_SYNC_TIMEOUT_MS);
      }
      if (HXR_OK != m_pHeader->GetPropertyULONG32("HasOutOfOrderTS", ulVal))
      {
          m_pHeader->SetPropertyULONG32("HasOutOfOrderTS", 1);
      }
    }

    /*
     * The core must be forced to buffer until at least one
     * packet from each stream is received since the core
     * does not generate properly offset OnTimeSync() calls
     * to the renderers until such time.
     */
    ULONG32 ulVal = 0;
    // We want to use predata to force buffering - see if it is already set
    m_pHeader->GetPropertyULONG32("PreDataAtStart", ulVal);
    if (ulVal)
    {
      // If PreDataAtStart is already used, make sure Predata is not 0
      m_pHeader->GetPropertyULONG32("Predata", ulVal);
    }
    if (!ulVal)
    {
      // Predata is not used for buffering, use it as long as any preroll
      // is wanted on this stream
      m_pHeader->GetPropertyULONG32("Preroll", ulVal);
      
      if (ulVal)
      {
          m_pHeader->SetPropertyULONG32("PreDataAtStart", 1);
          m_pHeader->SetPropertyULONG32("PreDataAfterSeek", 1);
          m_pHeader->SetPropertyULONG32("PrerollAtStart", 1);
          m_pHeader->SetPropertyULONG32("PrerollAfterSeek", 1);
          
          ulVal = 0;
          m_pHeader->GetPropertyULONG32("Predata", ulVal);
          if (ulVal == 0)
          {
            ulVal = 1;
            m_pHeader->SetPropertyULONG32("Predata", ulVal);
          }
      }
    }

    /*
     * Get the strings to send in RTCP receiver reports from the plugin
     */
    m_pPlugin->GetCName(m_pCName);
    m_pPlugin->GetUserName(m_pName);
    m_pPlugin->GetTool(m_pTool);
    m_pPlugin->GetEmailName(m_pEmail);
            
    m_pNet->CreateResolver(&m_pResolver);
    HX_ASSERT(m_pResolver != NULL);
         
    m_uMultiPort = uMultiPort;
    m_ulInterfaceAddr = ulInterfaceAddr;

    // initialize the bin
    // 5 RTCP interval + 1 for the ones to be removed
    m_MemberTimeoutBins.Init(6);
    m_MemberTimeoutBins.UpdateBins(); 

    /*
     * Finish the Initialization in GetHostByNameDone
     */
    m_pResolver->Init(this);
    m_pResolver->GetHostByName(pMultiAddr);
     
    return HXR_OK;
}

 STDMETHODIMP CPurePlaySource::GetHostByNameDone
 (
     HX_RESULT status, 
     UINT32 ulAddr
 )
 {
    HX_RESULT     theErr = HXR_OK;
    BOOL    bRTPSockOK = FALSE;
    IHXSetSocketOption*    pSockOpt  = NULL;          

     /*
     * Continue where we left off with the initialization
     * see CPurePlaySource::Init
     */
     
     if (status != HXR_OK)
     {
       m_pPlugin->StreamDone(status, m_iStream);
       theErr = status;
       goto bail;
     }
     else
     {
      /*
      * Create RTP socket for RTP messages on the port passed
      */          
      m_ulMultiAddr = ulAddr;
      
      m_pNet->CreateUDPSocket(&m_pRTPSock);
      HX_ASSERT(m_pRTPSock != NULL);
      
      m_pRTPResp = new CRTPResponseHandler(this, RTP_PORT);
      m_pRTPResp->AddRef();
      
      theErr = m_pRTPSock->Init(m_ulInterfaceAddr, m_uMultiPort, m_pRTPResp);
      if (theErr != HXR_OK) goto bail;
      
      // set option before it binds
      theErr = m_pRTPSock->QueryInterface(IID_IHXSetSocketOption, 
          (void**)&pSockOpt);       
      if (pSockOpt)
      {
          pSockOpt->SetOption(HX_SOCKOPT_REUSE_ADDR, TRUE);
          pSockOpt->SetOption(HX_SOCKOPT_REUSE_PORT, TRUE); 
      }

#ifdef _UNIX        
      theErr = m_pRTPSock->Bind(HX_INADDR_ANY, m_uMultiPort);
#else
      theErr = m_pRTPSock->Bind(m_ulInterfaceAddr, m_uMultiPort);
#endif        
      if (theErr != HXR_OK) goto bail;

      theErr = m_pRTPSock->JoinMulticastGroup(m_ulMultiAddr, 
          m_ulInterfaceAddr);
      if (theErr != HXR_OK) goto bail;

      if (pSockOpt)
      {
          pSockOpt->SetOption(HX_SOCKOPT_MULTICAST_IF, m_ulInterfaceAddr);
          HX_RELEASE(pSockOpt);
      }         
      bRTPSockOK = TRUE;
      
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pFFLog, "RTP IGMP join %u on %u\n", m_ulMultiAddr, 
            m_ulInterfaceAddr);
      }
#endif                
      
      /*
      * RTCP messages come in on the next port higher than the RTP
      * port.  We need to send messages on this so get the TTL and
      * set it on this socket only.
      */
      
      m_pNet->CreateUDPSocket(&m_pRTCPSock);
      HX_ASSERT(m_pRTCPSock != NULL);
      
      m_pRTCPResp = new CRTPResponseHandler(this, RTCP_PORT);
      m_pRTCPResp->AddRef();
      
      theErr = m_pRTCPSock->Init(m_ulInterfaceAddr, m_uMultiPort + 1, m_pRTCPResp);
      if (theErr != HXR_OK) goto bail;
      
      // set option before it binds
      theErr = m_pRTCPSock->QueryInterface(IID_IHXSetSocketOption, 
          (void**)&pSockOpt);       
      if (pSockOpt)
      {
          pSockOpt->SetOption(HX_SOCKOPT_REUSE_ADDR, TRUE);
          pSockOpt->SetOption(HX_SOCKOPT_REUSE_PORT, TRUE); 
      }

#ifdef _UNIX        
      theErr = m_pRTCPSock->Bind(HX_INADDR_ANY, m_uMultiPort + 1);
#else
      theErr = m_pRTCPSock->Bind(m_ulInterfaceAddr, m_uMultiPort + 1);
#endif        

      if (theErr != HXR_OK) goto bail;
      
      theErr = m_pRTCPSock->JoinMulticastGroup(m_ulMultiAddr, 
          m_ulInterfaceAddr);
      if (theErr != HXR_OK) goto bail;

      if (pSockOpt)
      {
          // set multicast interface so we will send RTCP pkts on the right 
          // interface.
          pSockOpt->SetOption(HX_SOCKOPT_MULTICAST_IF, m_ulInterfaceAddr);
          HX_RELEASE(pSockOpt);
      }         
      
      
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pFFLog, "RTCP IGMP join %u on %u\n", m_ulMultiAddr, 
            m_ulInterfaceAddr);
      }
#endif                

     }
     
    Begin();
     
    return HXR_OK;

bail:
    /*
     *      XXXGo - If one of interfaces succeed, we don't want to end this whole
     *      presentation....Need to have a special clean up sequence...
     */
     
    if (!bRTPSockOK)
     {
      m_pPlugin->m_strLastErr.Format("%d.%d.%d.%d/%d on %d.%d.%d.%d",
               (ulAddr >> 24) & 0xFF,
                 (ulAddr >> 16) & 0xFF,
               (ulAddr >>  8) & 0xFF,
               (ulAddr      ) & 0xFF,
               m_uMultiPort,
               (m_ulInterfaceAddr >> 24) & 0xFF,
               (m_ulInterfaceAddr >> 16) & 0xFF,
               (m_ulInterfaceAddr >>  8) & 0xFF,
               (m_ulInterfaceAddr      ) & 0xFF);
     }
     else
     {
      m_pPlugin->m_strLastErr.Format("%d.%d.%d.%d/%d on %d.%d.%d.%d",
               (ulAddr >> 24) & 0xFF,
                 (ulAddr >> 16) & 0xFF,
               (ulAddr >>  8) & 0xFF,
               (ulAddr      ) & 0xFF,
               m_uMultiPort + 1,
               (m_ulInterfaceAddr >> 24) & 0xFF,
               (m_ulInterfaceAddr >> 16) & 0xFF,
               (m_ulInterfaceAddr >>  8) & 0xFF,
               (m_ulInterfaceAddr      ) & 0xFF);              
     }

    HX_RELEASE(pSockOpt);
     HX_RELEASE(m_pRTPSock);
     HX_RELEASE(m_pRTCPSock);
     HX_RELEASE(m_pRTPResp);     
     HX_RELEASE(m_pRTCPResp);

    // we want to switch to unicast if possible.
    m_pPlugin->m_tryUnicast = SOCKET_FAIL;   
    m_pPlugin->Close();     

     return theErr;
 }
 
 void 
     CPurePlaySource::Begin()
 {
 /*
 * We may be called during the resolution of the multicast address
 * in Init, and are not ready to start reading.  This is called
 * by GetHostByName when it completes
     */
     
     if (m_bBeginPending)
     {
      m_ulBeginClockTick = HX_GET_TICKCOUNT();
       m_bBeginPending = FALSE;
       m_pRTPSock->Read(1024);
       m_pRTCPSock->Read(1024);
     }
     else
     {
       m_bBeginPending = TRUE;
     }
 }


// most cleaning up is done here
void
CPurePlaySource::Cleanup(void)
{
     /*
     * Release the callback if it exists
     */ 

    if (m_hStatusCallbackID != 0)
    {
      if (m_pScheduler)
      {
          m_pScheduler->Remove(m_hStatusCallbackID);
          m_hStatusCallbackID = 0;
      }
    }


    HX_RELEASE(m_pHeader);     
    HX_RELEASE(m_pNet);   
    HX_RELEASE(m_pResolver);
     
    /*
    * Flush out any pending reads
    */
    if (m_pRTPSock)
    {
        m_pRTPSock->LeaveMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr);
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pLogFile, "RTP IGMP leave %u on %u\n", m_ulMultiAddr, 
            m_ulInterfaceAddr);
          fflush(m_pLogFile);
      }
#endif                        
    }   

    if (m_pRTCPSock)
    {
        m_pRTCPSock->LeaveMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr);
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pLogFile, "RTCP IGMP leave %u on %u\n", m_ulMultiAddr,
            m_ulInterfaceAddr);
          fflush(m_pLogFile);
      }
#endif                        
    }

    HX_RELEASE(m_pRTPSock);
    HX_RELEASE(m_pRTPResp);

    HX_RELEASE(m_pRTCPSock);
    HX_RELEASE(m_pRTCPResp);     
    
    HX_RELEASE(m_pCName);
    HX_RELEASE(m_pName);
    HX_RELEASE(m_pTool);
    HX_RELEASE(m_pEmail);
     
    HX_RELEASE(m_pScheduler);
    HX_RELEASE(m_pPlugin);


    // need to clean up Sources
    // First clean up ID map.  m_SsrcToMember.RemoveKey() may return FALSE if 
    // this strm obj is RA and it has been removed from SSRC map in 
    // RemoveSender().
    if (!m_StrmIdToStrm.IsEmpty())
    {
      CHXMapLongToObj::Iterator i;
      for (i = m_StrmIdToStrm.Begin(); i != m_StrmIdToStrm.End(); ++i)
        {
          HX_ASSERT(((CStream*)(*i))->m_bMadeCut);
          m_SsrcToStream.RemoveKey(((CStream*)(*i))->m_ulSSRC);       
            delete (CStream*)(*i);
      }
      m_StrmIdToStrm.RemoveAll(); 
    }

    if (!m_SsrcToStream.IsEmpty())
    {
      // we have more source obj's in SSRC map.  They are the ones which 
      // didn't make the source admission timeout cut!
      CHXMapLongToObj::Iterator i;
      for (i = m_SsrcToStream.Begin(); i != m_SsrcToStream.End(); ++i)
        {
            delete (CStream*)(*i);
      }     
      m_SsrcToStream.RemoveAll();
    }

    if (!m_SsrcToMember.IsEmpty())
    {
      CHXMapLongToObj::Iterator i;
      for (i = m_SsrcToMember.Begin(); i != m_SsrcToMember.End(); ++i)
        {
            delete (CMember*)(*i);
      }
      m_SsrcToMember.RemoveAll(); 
    }

    // all the obj have been deleted above...
    m_MemberTimeoutBins.DeleteAllBins();

    HX_RELEASE(m_pByePkt);
    // this Release will cause it to get deleted
    Release();
}
 
 HX_RESULT
     CPurePlaySource::Close()
{
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "Source::Close()\n");
          fflush(m_pLogFile);
      }
#endif

    if (m_bClosed)
    {
      // don't do this twice!!!
        return HXR_OK;
    }

    m_bClosed = TRUE;

    AddRef();
    
    // if addmission had not been closed, we have never sent any
    // RTCP yet, so there is no reason to send BYE msg
    if (m_pPlugin->IsSourceAdmissionClosed())
    { 
        if (!ScheduleBye())
        {
          // ByePkt will not be sent, so need to clean up here
          Cleanup();
        }
    }
    else
    {
      Cleanup();
    }


    return HXR_OK;
}
 
 void CPurePlaySource::GetHeader(REF(IHXValues*) pHeader, 
                         UINT16 unStreamNumber)
 {
     
     // it's all the same, except stream number
     HX_ASSERT(m_pHeader != NULL);
     
     pHeader = new CHXHeader;
     pHeader->AddRef();
     const char* pCh;
     ULONG32 ul;
     
     // first, ULONG32
     if (m_pHeader->GetFirstPropertyULONG32(pCh, ul) == HXR_OK)
     {
       pHeader->SetPropertyULONG32(pCh, ul);        
       while (HXR_OK == m_pHeader->GetNextPropertyULONG32(pCh, ul))
       {
           pHeader->SetPropertyULONG32(pCh, ul);          
       }
     }
     
     // then, CString
     IHXBuffer* pCString;
     if (m_pHeader->GetFirstPropertyCString(pCh, pCString) == HXR_OK)
     {
       pHeader->SetPropertyCString(pCh, pCString);
       HX_RELEASE(pCString);
       while (HXR_OK == m_pHeader->GetNextPropertyCString(pCh, pCString))
       {
           pHeader->SetPropertyCString(pCh, pCString);          
           HX_RELEASE(pCString);
       }
     }
     
     // then, Buffer
     IHXBuffer* pBuf;
     if (m_pHeader->GetFirstPropertyBuffer(pCh, pBuf) == HXR_OK)
     {
       pHeader->SetPropertyBuffer(pCh, pBuf);
       HX_RELEASE(pBuf);
       while (HXR_OK == m_pHeader->GetNextPropertyBuffer(pCh, pBuf))
       {
           pHeader->SetPropertyBuffer(pCh, pBuf);         
           HX_RELEASE(pBuf);
       }
     }
     
     pHeader->SetPropertyULONG32("StreamNumber", unStreamNumber);
     
     pHeader->SetPropertyULONG32("LiveStream", 1);

#ifndef _USE_MASTER_STREAM_SYNC_SCHEME
     if (!m_bIsRMSource)
     {
       // Non RM source are RTCP paced an synchronized.
       // We muits indicate to a renderer (audio) to pay close attention to 
       // time stamps and produce appropriate gaps if needed to maintain 
       // synchronization.
       pHeader->SetPropertyULONG32("NonContigTime", 1);
     }
#endif      // _USE_MASTER_STREAM_SYNC_SCHEME
 }

 
 BOOL 
     CPurePlaySource::AreDependenciesOK(UINT16 uRule)
 {
     return m_pRules[uRule].m_bSyncOK;
 }
 
 void 
     CPurePlaySource::RecvRule(BOOL bOn, UINT16 uRule)
 {
     HX_ASSERT(m_pRules != NULL);
     
     if (bOn)
       m_pRules[uRule].m_bRecvOn = TRUE;
 }
 
 void 
     CPurePlaySource::SetSyncOK(UINT16 uRule, BOOL bOK)
 {
     m_pRules[uRule].m_bSyncOK = bOK;
 }
 
 BOOL 
     CPurePlaySource::IsDependOK(BOOL bOn, UINT16 uRule)
 {
    /*
    * Right now return TRUE if there are no rules and only check the
    * dependencies if all rules that the current rule depends on have
    * been received.
    */
     
     if (m_pRules == NULL)
     {
       return TRUE;
     }
     
     UINT16* pDeps = m_pRules[uRule].m_pOnDepends;
     
     if (!pDeps)
     {
       return m_pRules[uRule].m_bRecvOn;
     }
     
     for (; *pDeps != PP_DEPEND_END; pDeps++)
     {
       if (!m_pRules[*pDeps].m_bRecvOn)
       {
           return FALSE;
       }
     }
     
     return TRUE;
 }


HX_RESULT CPurePlaySource::Distribute(ULONG32 ulDistributionMode,
                              ULONG32 ulVal1,
                              ULONG32 ulVal2)
{
    CHXMapLongToObj::Iterator i;
    CStream* pStream;
    BOOL bDone;
    HX_RESULT retVal = HXR_OK;
    
    i = m_SsrcToStream.Begin();
       
    do
    {
      bDone = TRUE;
      pStream = NULL;

      if (i != m_SsrcToStream.End())
      {
          bDone = FALSE;
          pStream = ((CStream*)(*i));
          ++i;
      }
      
      switch (ulDistributionMode)
      {
      case DISTRIBUTE_SYNC:
          m_ulHXMasterTime = ulVal1;
          m_lHXOffsetToMaster = ((LONG32) ulVal2);
          m_bSyncDistributed = TRUE;
          if (pStream)
          {
            pStream->m_Syncer.HandleMasterSync(m_ulHXMasterTime, 
                                       m_lHXOffsetToMaster);
          }
          break;
          
      case DISTRIBUTE_SYNCANCHOR:
          m_ulHXAnchorTime = ulVal1;
          m_ulNTPHXTime = ulVal2;
          m_bSyncAnchorDistributed = TRUE;
          if (pStream)
          {
            pStream->m_Syncer.AnchorSync(m_ulHXAnchorTime, 
                                   m_ulNTPHXTime);
          }
          break;
          
      case DISTRIBUTE_STARTTIME:
          m_ulStartTime = ulVal1;
          m_bStartTimeDistributed = TRUE;
          if (pStream)
          {
            pStream->m_Syncer.SetStartTime(ulVal1, TRUE);
          }
          break;
          
      default:
          // nothing to do
          retVal = HXR_UNEXPECTED;
          break;
      }
    } while ((!bDone) && (retVal == HXR_OK));

    return retVal;
}


HX_RESULT CPurePlaySource::HandleRTPMsg(HX_RESULT status, IHXBuffer* pBuf)
{
    if (m_pPlugin->m_state == Closed)
    {// it is closing...
      return HXR_OK;
    }
   
     if (status == HXR_OK)
     {
       IHXPacket*     pPacket = NULL;
       IHXBuffer*     pDataBuf      = NULL;
       RTPPacket      pkt;
       UINT16               uASMRule      = 0;
       UINT8                chASMFlags    = HX_ASM_SWITCH_ON;
       BOOL           bSyncOK = TRUE;
       CStream*       pStrm   = NULL;
       ULONG32        ulTimeNow     = HX_GET_BETTERTICKCOUNT();
       ULONG32        ulHXTime;
       ULONG32        ulRTPTime;
       BOOL           bWasInProbation;
       
       HX_ASSERT(pBuf != NULL);
             
       /*
       * Convert this RTP packet into an IHXPacket
       */
       pkt.unpack(pBuf->GetBuffer(), pBuf->GetSize());

       if (!GetStreamBySSRC(pkt.ssrc, pStrm))
       {
           // never seen this ssrc.  add it to the map
           if (!AddNewEntryToMap(pkt.ssrc, 
                           pStrm))
           {
             m_pRTPSock->Read(1024);
             return HXR_OK;
           }
#ifdef XXXGo_DEBUG          
           if (m_pLogFile)
           {
             fprintf(m_pLogFile, "HandleRTP: adding a new entry with ssrc = %u\n", 
                 pkt.ssrc);
             fflush(m_pLogFile);
           }
#endif                  
       }
       HX_ASSERT(pStrm);

       if (pkt.payload != m_chRTPPayload)
       {
          // we have to ignore any packet whose payload does not match the 
          // one in SDP file.
          m_pRTPSock->Read(1024);
          HX_ASSERT(FALSE && "Received packed with different payload type on the same stream");     
          return HXR_OK;   
       }

       // yes, we heard from you!
       pStrm->m_bHeardSinceLastTime = TRUE;
       pStrm->m_ulNumRRIntervals    = 0;
       
       if (!pStrm->m_bMadeCut)
       {
           m_pRTPSock->Read(1024);
           return HXR_OK;
       }

       bWasInProbation = pStrm->IsInProbation();

       // check version and seq#
       if ((RTP_VERSION != pkt.version_flag) || 
           !pStrm->UpdateSeqNum((UINT16)pkt.seq_no, pkt.timestamp))
       {
           // invalid!
           m_pRTPSock->Read(1024);
           return HXR_OK;
       }

       if (bWasInProbation && (!pStrm->IsInProbation()))
       {
           m_ulStartedSourceCount++;
       }

      /************************
       * calculate jitter (rfc 1889 Appendix A.8)
       */
      INT32 lTransit = ulTimeNow / 1000 - pkt.timestamp;
      INT32 lD = lTransit - pStrm->m_ulTransit;
      pStrm->m_ulTransit = lTransit;
      if (lD < 0)
      {
          lD = -lD;
      }
      pStrm->m_ulJitter += (UINT32)((1./16.) * ((double)lD - pStrm->m_ulJitter));
      
      /*
       * Store off the initial time value of the first packet
       */
        if (pStrm->m_bInitialPkt)
      {
#ifdef XXXGo_DEBUG
          if (m_pLogFile)
          {
            fprintf(m_pLogFile, "ssrc=%u, num sources = %d\n", pkt.ssrc, 
                                       GetNumSrc());
            fflush(m_pLogFile);                                      
          }                                
#endif
          pStrm->m_bInitialPkt = FALSE;

          pStrm->m_ulInitialRTPTime = pkt.timestamp; 
          pStrm->m_ulStartTimeMS = ulTimeNow;
          pStrm->m_ulStatStartMS = ulTimeNow;

          if (!IsRMSource())
          {
            pStrm->m_Syncer.SetStartTime(ulTimeNow);
            pStrm->m_Syncer.SetStartSync(pkt.timestamp,
                                   STARTING_HX_TIMESTAMP,
                                   TRUE,
                                   ulTimeNow);

            ulHXTime = pStrm->m_Syncer.RTP2SyncHX(pkt.timestamp);
            ulRTPTime = pStrm->m_Syncer.RTP2SyncRTP(pkt.timestamp);
            pStrm->m_ulLastRawRTPTS = pkt.timestamp;
            pStrm->m_ulLastRTPTS = ulRTPTime;
            pStrm->m_ulLastHXTS = ulHXTime;
          }
      }

      /*
       * Update stats with Packet size plus UDP overhead (8) and IP (20)
       */
      pStrm->SetStats(ulTimeNow, pBuf->GetSize() + 28);

      // XXXGo
      // we need this
      // this is the very first pkt of this "stream"
      if (m_bInitialPacket)
      {
#ifdef XXXGo_DEBUG       
          if (m_pLogFile)
          {
              fprintf(m_pLogFile, "Very First Pk ssrc=%u\n", pkt.ssrc);
              fflush(m_pLogFile);
          }
#endif

          // if we have failed to obtain payload from the stream header,
          // m_chRTPPayload is -1
          if (m_chRTPPayload != -1 && m_chRTPPayload != pkt.payload)
            {
              // payload of packet received is not the same as the one in sdp 
              // we have to ignore this.
            m_pRTPSock->Read(1024);
            return HXR_OK;
          }
          
          m_bInitialPacket = FALSE;
          
          /*
          * Schedule the initial RTCP receiver report
          */
          UINT32  ulNextTime = (UINT32) 
                        (m_pRTCPInterval->GetRTCPInterval(
                                    m_pRTCPInterval->GetRTCPBW(), 
                                    m_SsrcToMember.GetCount(),
                                    GetTrueNumSrc())
                        * 1000.0);

          Schedule(ulNextTime, RTCP_SR);
      }   
 
      HX_ASSERT(pkt.ssrc == ((INT32) pStrm->m_ulSSRC));

       /*
        * Convert the timestamp back to RMA format from RTP format
        * if non RealMedia presentation
        */   
       if (IsRMSource())
       {
           // RealMedia source
           ulHXTime = (ULONG32) pkt.timestamp;
           
 #ifdef XXXGo_DEBUG
          if (m_pLogFile)
          {
            fprintf(m_pLogFile, "transfering ssrc(%u): TS(%u) -> %u\n", 
                  pkt.ssrc, pkt.timestamp, ulHXTime);
            fflush(m_pLogFile);
          }                        
#endif

           if ((pkt.extension_flag == 1) && (pkt.op_code == RTP_OP_ASMRULES))
           {
             chASMFlags = (UINT8) pkt.asm_flags;
             uASMRule = pkt.asm_rule;
           }
       }
       else
       {
           // Non-RealMedia source

           // Wait for wall clock synchronization
           if (!pStrm->m_Syncer.IsStartSyncSet())
           {  
             // We can't convert to synchronized time stamp yet or
             // the synchronization point is not close enough to be accurate
#ifdef XXXGo_DEBUG
             if (m_pLogFile)
             {
                 fprintf(m_pLogFile, "Ignoring packet before RTCP sync: ssrc(%u): TS(%u)\n", 
                   pkt.ssrc, pkt.timestamp);
                 fflush(m_pLogFile);
             }                           
#endif
             m_pRTPSock->Read(1024);
             return HXR_OK;   
           }

           // Convert from RTP to synchronized RMA time
           if (pStrm->m_ulLastRawRTPTS == pkt.timestamp)
           {
             // We need to make sure the RTCP synchronization does not
             // cause subsequent, same time-stamped packets to take on
             // different time stamps as this may compromise payload
             // integrity.
             ulRTPTime = pStrm->m_ulLastRTPTS;
             ulHXTime = pStrm->m_ulLastHXTS;
           }
           else
           {
             ulHXTime = pStrm->m_Syncer.RTP2SyncHX(pkt.timestamp);
             ulRTPTime = pStrm->m_Syncer.RTP2SyncRTP(pkt.timestamp);
             pStrm->m_ulLastRawRTPTS = pkt.timestamp;
             pStrm->m_ulLastRTPTS = ulRTPTime;
             pStrm->m_ulLastHXTS = ulHXTime;
           }
           
#ifdef _SYNC_TRACE
           if (m_sfile)
           {
             fprintf(m_sfile, "adjusting ssrc(%u): TS(%u) -> %u\n",
                 pkt.ssrc, pkt.timestamp, ulHXTime);
             fflush(m_sfile);
           }
#endif      // _SYNC_TRACE

#ifdef XXXGo_DEBUG
           if (m_pLogFile)
           {
             fprintf(m_pLogFile, "adjusting ssrc(%u): TS(%u) -> %u\n", 
                 pkt.ssrc, pkt.timestamp, ulHXTime);
             fflush(m_pLogFile);
           }                             
#endif
           
           uASMRule = pkt.marker_flag ? 1 : 0;
           chASMFlags = HX_ASM_SWITCH_ON;
       }

       pDataBuf = new CHXBuffer;
       pDataBuf->AddRef();
       
       pDataBuf->Set((unsigned char*)pkt.data.data, pkt.data.len);
       
       if (m_bIsRMSource)
       {
          pPacket = new CHXPacket;
          pPacket->AddRef();
       
          pPacket->Set(pDataBuf, 
                   ulHXTime,
                   pStrm->m_unStreamNum,
                   chASMFlags, 
                   uASMRule);
       }
       else
       {
           pPacket = new CHXRTPPacket;
           pPacket->AddRef();

           pStrm->SequentializeRMATS(ulHXTime);

           ((IHXRTPPacket*) pPacket)->SetRTP(pDataBuf, 
                                    ulHXTime,
                                    ulRTPTime,
                                    pStrm->m_unStreamNum,
                                    chASMFlags, 
                                    uASMRule);
       }
       
       RecvRule((chASMFlags & HX_ASM_SWITCH_ON), uASMRule);
       
       /*
        * Add this packet to the list of pending packets and process
        * the list
        */
#ifdef _DO_BUFFER_OCCUPANCY_CHECKS
       if (pStrm->m_pTransBuf->GetQueuedPktCount() > MAX_BUFFERED_STREAM_PACKETS)
       {
           IHXPacket* pDeadPacket = NULL;
           while (pStrm->m_pTransBuf->GetQueuedPktCount() > UNLOCK_THRESHOLD_LEVEL)
           {
             if (pStrm->GetPacket(pDeadPacket, ulTimeNow) != HXR_OK)
             {
                 break;
             }

             HX_RELEASE(pDeadPacket);
           }
       }
#endif      // _DO_BUFFER_OCCUPANCY_CHECKS
       
       if ((!IsRMSource()) || AreDependenciesOK(uASMRule))
       {
#ifdef XXXGo_DEBUG       
           //fprintf(m_file, "+++ %u Adding Packet\n", iStream);
#endif          
           pStrm->m_pTransBuf->AddPacket(pkt.seq_no, pPacket, ulTimeNow);
       }    
       else
       {
          /*
           * Check to see if the dependencies have been filled and add
           * the packet if they have
           */
           if (IsDependOK((chASMFlags & HX_ASM_SWITCH_ON), uASMRule))
           {
             SetSyncOK(uASMRule);
             pStrm->m_pTransBuf->AddPacket(pkt.seq_no, pPacket, ulTimeNow);
           }
       }

       pDataBuf->Release();
       pPacket->Release();
       
#ifdef XXXGo_DEBUG
//fprintf(m_pLogFile, "HandleRTP: ssrc = %u, t.s. = %u, #Sources = %d\n", 
//          pkt.ssrc, lHXTime, GetNumSrc());
#endif

      m_pPlugin->ProcessPendingPackets();
    }
    else
    {
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pFFLog, "this is why!!!\n");
      }         
#endif
      HX_ASSERT(FALSE);
      m_pPlugin->Close();
      return HXR_FAIL;
    }

    if (m_pRTPSock)
    {
      m_pRTPSock->Read(1024);
    }
    
    return HXR_OK;
}

#define RTCP_VALID_MASK     (0xe0fe)
#define RTCP_VALID_VALUE    (0x80c8)

HX_RESULT
CPurePlaySource::HandleRTCPMsg(HX_RESULT status, IHXBuffer* pBuf)
{
    if (status != HXR_OK || !pBuf)
    {
      HX_ASSERT(FALSE);
      return HXR_OK;
    }

    pBuf->AddRef();
    
    HX_RESULT     theErr = HXR_OK;   
    BYTE*   pFirst = pBuf->GetBuffer();
    BYTE*   pNext = pFirst;
    // len of compound RTCP pkt in words
    UINT32  ulCompoundLen = pBuf->GetSize();
    CStream*      pStrm       = NULL;
    UINT32  ulSsrc = 0;   // put ssrc if this ssrc is not found in a map
    CMember*      pMember = NULL;

    if (m_pRTCPInterval)
    {
      m_pRTCPInterval->UpdateAvgRTCPSize(ulCompoundLen);
    }

    /* 
    **      validity check
    */
    BYTE* pCompound = pFirst; 
    BYTE* pCompoundEnd; 
    UINT16 nRTCPHeader = getshort(pCompound);

    if ((nRTCPHeader & RTCP_VALID_MASK) != RTCP_VALID_VALUE)
    {       
      HX_ASSERT(FALSE && "invalid RTCP header");
      /* something is wrong */ 
      goto done;
    }
    pCompoundEnd = pCompound + ulCompoundLen;
    
    UINT8 uchVersion;
    UINT16 unLength;
    UINT8 uchType;
    BYTE* uchOff;
    do 
    {
      uchOff = pCompound;
      uchVersion  = (*uchOff++ & 0xc0) >> 6;
      uchType           = *uchOff++;

      unLength    = *uchOff++ << 8;
      unLength    |= *uchOff++;

      // If it's our own RR, just ignore!
      if (RTCP_RR == uchType) 
      {
          UINT32 ulRrSsrc;
          ulRrSsrc  = ((UINT32)*uchOff++)<<24; 
          ulRrSsrc |= ((UINT32)*uchOff++)<<16;
          ulRrSsrc |= ((UINT32)*uchOff++)<<8; 
          ulRrSsrc |= ((UINT32)*uchOff++);
          if (ulRrSsrc == m_ulThisSrcID)
          {
                // this is our own RR
            goto done;
          }
      }

      pCompound = (pCompound + ((unLength + 1) * 4));
    } while (pCompound < pCompoundEnd && uchVersion == 2);

    if (pCompound!= pCompoundEnd)
    {
      HX_ASSERT(uchVersion == 2);
      HX_ASSERT(pCompound > pCompoundEnd);
      HX_ASSERT(pCompound < pCompoundEnd);
      HX_ASSERT(FALSE && "invalid RTCP length");
      /* something is worng */
      goto done;
    }

    /*
    * This may be a compound RTCP packet so keep unpacking until we
    * have unpacked everything.
    */ 
    while (pNext && pNext < (pFirst + pBuf->GetSize()))
    {
      RTCPPacket  pkt;
      
      pNext = pkt.unpack(pNext, (pFirst + pBuf->GetSize()) - pNext);
      
      // erase:  Look in rmartsp rtpwrap.h unpack
      // sometimes sdes_data is NULL for some reason.
      if (NULL == pNext)
      {
          HX_ASSERT(FALSE && "sdes_data empty");
          continue;
      }

      switch (pkt.packet_type)
      {
      case RTCP_SR:
          {
            // get the right source from ssrc
            if (!GetStreamBySSRC(pkt.sr_ssrc, pStrm))
            {
                /*
                 *      Unless we receive at least one RTP pkt (which can 
                 *      be used to make sure payload type is the same), don't
                 *      take any SR
                 */
                goto done;
            }
            HX_ASSERT(pStrm);

            pStrm->m_bHeardSinceLastTime = TRUE;
            pStrm->m_ulNumRRIntervals    = 0;

            if (!pStrm->m_bMadeCut)
            {
                goto done;
            }
            
            if (!pStrm->IsRMStream())
            {
                pStrm->m_Syncer.HandleRTCPSync(NTPTime(pkt.ntp_sec, pkt.ntp_frac),
                                       pkt.rtp_ts);
            }

#ifdef _SYNC_TRACE
            if (m_sfile)
            {
                fprintf(m_sfile, "syncing ssrc(%u): TS(%u) -> %u\n",
                  pkt.sr_ssrc, 
                  pStrm->m_Syncer.GetSyncOffsetRTP(), 
                  pStrm->m_Syncer.GetSyncOffsetHX());
                fflush(m_sfile);
            }
#endif      // _SYNC_TRACE
            
            // it's better come from sender of RTP pkt
            if (pStrm->m_ulSSRC != pkt.sr_ssrc)
            {
                HX_ASSERT(FALSE && "wrong ssrc in SR");
                goto done;
            }

            /*************
            *** need to be tested later
            **************/
            // save SR ts
            // the middle 32 bits out of 64 in the NTP timestamp
            pStrm->m_ulLSR = pkt.ntp_sec  << 16;
            pStrm->m_ulLSR |= pkt.ntp_frac;

            // save current time for later use 
            pStrm->m_ulLastSRArrivalTime = HX_GET_TICKCOUNT();
          }
          break;
          
      case RTCP_BYE:
          {
            // In this RTCP, there should be nothing after Bye
          // 
            UINT32* pByeSsrc;
            // remove all the source in this bye
            for (UINT32 i = 0; i < pkt.count; i++)
            {
                pByeSsrc = pkt.bye_src + i;

#ifdef XXXGo_DEBUG
                if (m_pFFLog)
                {
                  fprintf(m_pFFLog, "BYE!!!\n");
                }
#endif                

                // we don't want to end this stream right away as it still
                // has some packets in the buffer that needs to be played
                if (!MarkAsEnd(*pByeSsrc))
                {
                  if (!RemoveSource(*pByeSsrc))
                  {
                      // this is BYE from non-sender
                      RemoveMember(*pByeSsrc);              
                  }
                }
            }
            
            theErr = HXR_OK;        

            if (m_pByePkt && m_pRTCPInterval)
            {
                // RTCP_BYE is pending.
                m_pRTCPInterval->SetCurNumMembers(m_pRTCPInterval->GetCurNumMembers()+1);
            }

            goto done;
          }
          break;
          
      case RTCP_SDES:
          {
            if (!pStrm)
            {
                HX_ASSERT(pMember);
                if (!pMember)
                {
                  goto done;
                }
            }
            else
            {
                pMember = pStrm;
            }
            /*******************
            * get at leat CNAME
            ********************/
            // look for list of sdes item with source id
            CHXSimpleList* pSdesList;
            SDESItem* pSdes;

            if (pkt.m_mapSDESSources.Lookup(pMember->m_ulSSRC, (void*&)pSdesList))
            {
                // set up variables to iterate over list
                int nNumElements = pSdesList->GetCount();
                LISTPOSITION lpPosition = pSdesList->GetHeadPosition();

                // Go through all list elements deleting the data at each node.
                for (int i=0; i < nNumElements; i++)
                {
                  pSdes = (SDESItem*) pSdesList->GetNext(lpPosition);

                  // set up
                  CHXString** ppStr = NULL;

                  if (SDES_CNAME == pSdes->sdes_type)
                      ppStr = &pMember->m_pCName;
                  else if (SDES_NAME == pSdes->sdes_type)
                      ppStr = &pMember->m_pName;
                  else if (SDES_EMAIL == pSdes->sdes_type)
                      ppStr = &pMember->m_pEmail;
                  else if (SDES_PHONE == pSdes->sdes_type)
                      ppStr = &pMember->m_pPhone;
                  else if (SDES_LOC == pSdes->sdes_type)
                      ppStr = &pMember->m_pLoc;
                  else if (SDES_TOOL == pSdes->sdes_type)
                      ppStr = &pMember->m_pTool;
                  else if (SDES_NOTE == pSdes->sdes_type)
                      ppStr = &pMember->m_pNote;
                  else if (SDES_PRIV== pSdes->sdes_type)
                      ppStr = &pMember->m_pPriv;
                  else
                      HX_ASSERT(FALSE && "wrong sdes_type");


                  if (!*ppStr)
                  {
                      *ppStr = new CHXString(pSdes->data);
                  }
                  else if (**ppStr != (const char*)pSdes->data)
                  {
                      // since we keyed off from ssrc, ssrc might 
                      // have been changed (restart/collision)
                      // XXXGH need to update maps
                      /*
                      char sz[100];
                      memset(sz, 0, 100);
                      wsprintf( sz, "what we had = %s\n"
                                    "new one     = %s ", 
                                 (*ppStr)->GetBuffer(10),
                                 pSdes->data);
                      ::MessageBox( NULL, sz, "SDES not the same", MB_OK );

                      HX_ASSERT(FALSE && "sdes not the same");
                      */
                  }

                  ppStr = NULL;
                  /*
                  * XXXGH
                  * somehow, we have to store ssrc that comes with 
                  * SDES, so we can correctly map it
                  */
                }  
            }
          }
          break;
          
      case RTCP_RR:
          {
            if (pkt.rr_ssrc == m_ulThisSrcID)
            {
                // just ignore our own RR.
                goto done;
            }
            else
            { 
                if (!GetStreamBySSRC(pkt.rr_ssrc, pStrm))
                {
                  // this is not a sender
                  if (!GetMember(pkt.rr_ssrc, pMember))
                  {
                      AddNewMember(pkt.rr_ssrc, pMember);
                  }
#ifdef XXXGo_DEBUG
                  if (m_pFFLog)
                  {
                      fprintf(m_pFFLog, "put %d %d\n", 
                      pMember->m_binInfo.lBin,
                      pMember->m_binInfo.pos);
                      fflush(m_pFFLog);
                  }
#endif                  
                  m_MemberTimeoutBins.PutInNewestBin(pMember, pMember->m_binInfo);
                }
            }
          }
          break;
          
      case RTCP_APP:
          {
            //::MessageBox( NULL, NULL, "RTCP_APP", MB_OK );
          }
          break;
          
      default:
          break;
      }
    } // end while
    
    
done:
    HX_RELEASE(pBuf);
    if (m_pRTCPSock)
    {
      m_pRTCPSock->Read(1024);  
    }

    return theErr;
}

/*
*   
*/
HX_RESULT
CPurePlaySource::UpdateStatistics(IHXRegistry* pReg, ULONG32 ulTimeNow)
{
    /*
    * If we have had the registry ID of the place to put
    * our bandwidth info set then use it and set the proper
    * bandwidth for this stream
    */
    CStream* pStream = NULL;
    CHXMapLongToObj::Iterator i;

    // if GetStreamByStrmId() doesn't find the Stream, it will return NULL
    for (i = m_SsrcToStream.Begin();
       i != m_SsrcToStream.End();
       ++i)
    {
      pStream = (CStream*) (*i);

      HX_ASSERT(pStream);

      // for each stream, update the stats
      if (pStream && 
          (pStream->m_bMadeCut) && 
          (pStream->m_ulClipBandwidthID != 0))
      {
          pStream->UpdateStatistics(pReg, ulTimeNow);
      }           
    }

    return HXR_OK;
}


HX_RESULT
CPurePlaySource::MakeRR(REF(RTCPPacket) pktRR)
{
/*
    if (GetNumSrc() == 0)
    {
      // it dones't make sense to even try to send RR when there is no sender
      return HXR_FAIL;
    }
*/    
    /*
    * this is a good chance to look at any inactive source and get rid of it
    * from the map
    */
    CHXMapLongToObj::Iterator i;
    UINT32 ulSrcCount = 0;
//    UINT32 ulNumInactive = 0;
    CStream* pStrm = NULL;
    
    for (i  = m_SsrcToStream.Begin(); i != m_SsrcToStream.End(); ++i)
    {
      pStrm = (CStream*)(*i);
      if (pStrm->m_bHeardSinceLastTime)
      {
          ulSrcCount++;
      }
      else 
      {
          if (pStrm->m_bMadeCut)
          {
            // 2 RTCP interval is too small.  Once we remove this source,
            // it will call StreamDone on this stream.  so, a little 
            // congestion end up with end of feed.
            if (++pStrm->m_ulNumRRIntervals >= 5)
            {
#ifdef XXXGo_DEBUG
      if (m_pFFLog)
      {
          fprintf(m_pLogFile, "SenderTimeout! Stream# %u\n", pStrm->m_unStreamNum);
          fflush(m_pLogFile);
      }
#endif                        
            
                RemoveSource(pStrm->m_ulSSRC);
//              ulNumInactive++;
            }
          }
          else
          {
            // rest of sources can come and go.  Much important to get the 
            // number of sources right!
            if (++pStrm->m_ulNumRRIntervals >= 2)
            {
                RemoveSource(pStrm->m_ulSSRC);
            }
          }
      }
    }

#if 0
    // if we haven't heard from any of sources for 5 RR intervals,
    // assume end
    if ((GetNumSrc() != 0) && (GetNumSrc() == ulNumInactive))
    {
      for (i  = m_StrmIdToStrm.Begin(); i != m_StrmIdToStrm.End(); ++i)
      {
          RemoveSource(((CStream*)(*i))->m_ulSSRC);
      }
      return HXR_FAIL;
    }
#endif
    
    pktRR.version_flag = 0x02;         // protocol version
    pktRR.padding_flag = 0;      // no padding
    pktRR.packet_type = RTCP_RR;   // this is RR
    pktRR.rr_ssrc = m_ulThisSrcID; // receiver generating this report

    // If there is no data transmission or reception to report, just add empty 
    // RR pkt with count = 0
    if (0 == ulSrcCount)
    {
      pktRR.count = 0;
      pktRR.length = 1;
    }
    else
    {
      pktRR.count = (UINT8)ulSrcCount;
      // 6 bytes / report
      pktRR.length = 1 + 6 * (UINT16)ulSrcCount;
    
        ReceptionReport* pRr = new ReceptionReport[ulSrcCount];

        if (!pRr)
        {
            return HXR_FAIL;
        }

        UINT32 ulCount = 0;
        for (i  = m_SsrcToStream.Begin();
             i != m_SsrcToStream.End() && ulCount < ulSrcCount; 
           ++i)
        {
            if (((CStream*)(*i))->m_bHeardSinceLastTime)
          {
            ((CStream*)(*i))->GetReceptionReport(pRr[ulCount]);
            ((CStream*)(*i))->m_bHeardSinceLastTime = FALSE;
            ulCount++;
          }
        }

        HX_ASSERT(ulCount == ulSrcCount);
        // XXXGo ReceptionReport::static_size() returns 24 because lost is in fact 
      // 24 bits.  However, since it is represented as UINT32, I really want 25.
      // so, calling sizeof() for now.
//          pktRR.SetReceiverReport(pRr, sizeof(ReceptionReport) * ulSrcCount);
      pktRR.SetReceiverReport(pRr, ulSrcCount);

      // since SetReceiverReport is making a copy, delete what we have
      HX_VECTOR_DELETE(pRr);
    } 
    return HXR_OK;
}

/*
*  Schdule a callback with a relative time
*/
void
CPurePlaySource::Schedule(UINT32 ulNextTime, int lEvent)
{
    if (!m_pScheduler)
    {
      return;
    }
    
    if (m_hStatusCallbackID != 0)
    {
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "Re - ");
          fflush(m_pLogFile);
      }         
#endif
    
      m_pScheduler->Remove(m_hStatusCallbackID);
      m_hStatusCallbackID = 0;
    }

    HX_ASSERT(0 == m_hStatusCallbackID);
    
    CRTCPSendStatusCallback* pSSCB = new CRTCPSendStatusCallback(this, lEvent);

    if (!pSSCB)
    {
      return;
    }

#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "Schduling (%u)\n", ulNextTime);
          fflush(m_pLogFile);
      }         
#endif

    
    pSSCB->AddRef();
    m_hStatusCallbackID = m_pScheduler->RelativeEnter(pSSCB, ulNextTime);
    pSSCB->Release();
}

void
CPurePlaySource::OnExpire(INT32 lEvent)
{
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "\tOnExpire\n");
          fflush(m_pLogFile);
      }         
#endif


    // Remove any member that timeout.
    CleanupMemberTable();

    /*
     *      This func is responsible for deciding whether to send an RTCP report or 
     *      BYE packet now, or to reschedule transmission.
     *  It is also responsible for updating the pmembers, initial, tp, and 
     *      avg_rtcp_size state var's.  This func should be called upon expiration 
     *      of the event timer.
     */
    UINT32 ulInterval = 0;    /* Interval */
//    double  tn; /* Next transmit time */
    BOOL    bSendIt = FALSE;  /* flag for sending pkt */

    UINT32 ulCurTime = GetCurrentSchedulerTimeInMsec(m_pScheduler);
    
//    HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
    // in msec.
//    UINT32 ulCurTime = lTime.tv_sec * 1000 + lTime.tv_usec / 1000;   
    UINT32 ulLastTime = m_pRTCPInterval->GetLastRTCPTime();

    // ulNumMembers + ulNumSenders = Total num members
    UINT32 ulNumMembers = m_SsrcToMember.GetCount();
    UINT32 ulNumSenders = GetTrueNumSrc();
    
    /* 
     *      To compensate for OPTION B converging to a value below the intended 
     *      average.
     */
    double const COMPENSATION = 2.71828 - 1.5;

    /*
     *      In the case of a BYE, we use OPTION B to reschedule the transmission of 
     *      BYE if necessary.
     */
    if (lEvent == RTCP_BYE)
    {
        if (m_pRTCPInterval)
      {

#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "\t\tCurNumMembers %u, NumSenders %u\n",
                              m_pRTCPInterval->GetCurNumMembers(), 
                              m_pRTCPInterval->GetNumSenders());                              
          fflush(m_pLogFile);
      }         
#endif

          ulInterval = (UINT32)(m_pRTCPInterval->GetRTCPInterval(
                              m_pRTCPInterval->GetRTCPBW() * COMPENSATION,
                              m_pRTCPInterval->GetCurNumMembers(), 
                              m_pRTCPInterval->GetNumSenders())                
                   * 1000.0);

          if (ulLastTime + ulInterval <= ulCurTime)
          {
            SendBye();
          }
          else
          {
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "\t\t%u + %u (%u)> %u\n", ulLastTime, ulInterval, 
          ulLastTime + ulInterval, ulCurTime);
          fflush(m_pLogFile);
      }         
#endif
            // reschdule this for later
            Schedule(ulLastTime + ulInterval - ulCurTime , RTCP_BYE);       
          }    
      }         

      return;
    }    

    HX_ASSERT(lEvent != RTCP_BYE);

    if ((m_algorithm == ALGORITHM_A) ||
      ((m_algorithm == ALGORITHM_C) && !m_pRTCPInterval->IsInitial()))
    {
      ulInterval = (UINT32) 
            (m_pRTCPInterval->GetRTCPInterval(
                m_pRTCPInterval->GetRTCPBW(),
                ulNumMembers, ulNumSenders) * 1000.0);

      if (ulNumMembers + ulNumSenders <= m_pRTCPInterval->GetLastNumMembers())
      {
          bSendIt = TRUE;
      }
      else
      {
          if (ulLastTime + ulInterval <= ulCurTime)
          {
            bSendIt = TRUE;
          }
      }
    }
    else if ((m_algorithm == ALGORITHM_B) ||
           ((m_algorithm == ALGORITHM_C) && m_pRTCPInterval->IsInitial()))
    {
      ulInterval = (UINT32) 
            (m_pRTCPInterval->GetRTCPInterval(
                  m_pRTCPInterval->GetRTCPBW() * COMPENSATION,
                  ulNumMembers, ulNumSenders) * 1000.0);

      if (ulLastTime + ulInterval <= ulCurTime)
      {
          bSendIt = TRUE;
      }
    }

    if (bSendIt)
    {
      UINT32 ulPktSize = SendRecvReport();
      if (ulPktSize)
          m_pRTCPInterval->UpdateAvgRTCPSize(ulPktSize);
      m_pRTCPInterval->SetLastRTCPTime(ulCurTime);
    }
    else
    {
      Schedule(ulInterval, RTCP_SR);      
    }

    m_pRTCPInterval->SetLastNumMembers(ulNumMembers + ulNumSenders);    
}


// Don't delete the map itself.  Just entries in the map!
void
CPurePlaySource::CleanupMemberTable()
{
    CHXSimpleList* pBin = NULL;
    if (SUCCEEDED(m_MemberTimeoutBins.GetOldestBin(pBin)))
    {
      if (!pBin->IsEmpty())
      {
          CHXSimpleList::Iterator i;
          for (i = pBin->Begin(); i != pBin->End(); ++i)
            {
            // this will remove this entry from m_SsrcToMember and
            // delete the obj.
            RemoveMember(((CMember*)(*i))->m_ulSSRC, FALSE);
          }
          pBin->RemoveAll(); 
      }    
    }

    m_MemberTimeoutBins.UpdateBins();
}


UINT32
CPurePlaySource::SendRecvReport(void)
{
    UINT32  ulPktSize = 0;
    IHXBuffer*    pUDPPacket = NULL;
    RTCPPacket        pktRR;
    RTCPPacket        pktSDES;
    CSDES       Sdes(m_pCName, m_pName, m_pTool, m_pEmail);
    

    // wait until admission is closed
    if (!m_pPlugin->IsSourceAdmissionClosed())
    {
      goto bail;
    }

/*
* Form the RR and SDES compound packet then write it out
* to the socket
    */

    if (HXR_OK != MakeRR(pktRR))
    {
      // we can't send RTCP pkt w/o RR
      goto bail;
    }
    
    
    if (HXR_OK != Sdes.MakeSDES(pktSDES, m_ulThisSrcID))
    {
      // no SDES, no RTCP pkt
      goto bail;
    }

    
    pUDPPacket = new CHXBuffer;
    pUDPPacket->AddRef();

    pUDPPacket->SetSize(((pktRR.length + 1) * 4) + ((pktSDES.length + 1) * 4));
    
    unsigned char*   pBuf;
    pBuf = pUDPPacket->GetBuffer();

    UINT32  ulLenRR;
    UINT32  ulLenSDES;
    
    pBuf = pktRR.pack(pBuf, ulLenRR);
    pBuf = pktSDES.pack(pBuf, ulLenSDES);
    
    HX_ASSERT(ulLenRR == (UINT32)(pktRR.length + 1) * 4);
    HX_ASSERT(ulLenSDES == (UINT32)(pktSDES.length + 1) * 4);
    
    m_pRTCPSock->WriteTo(m_ulMultiAddr, m_uMultiPort + 1, pUDPPacket);
    m_pRTCPInterval->SetInitial(FALSE);
    m_bRTCPSent = TRUE;
    ulPktSize = pUDPPacket->GetSize();  

#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          UINT32 ulTime = GetCurrentSchedulerTimeInMsec(m_pScheduler);
//        HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
//        UINT32 ulTime = lTime.tv_sec * 1000 + lTime.tv_usec / 1000;
          fprintf(m_pLogFile, "\t\tSendRevReport (%u)\n", ulTime);
          fflush(m_pLogFile);
      }         
#endif
    

bail:
    /*
    * Schedule the next status report
    */
    UINT32  ulNextTime = (UINT32) 
            (m_pRTCPInterval->GetRTCPInterval(
                    m_pRTCPInterval->GetRTCPBW(),
                    m_SsrcToMember.GetCount(),  // #members
                    GetTrueNumSrc())                // #senders
             * 1000.0);
    Schedule(ulNextTime, RTCP_SR);

    HX_RELEASE(pUDPPacket);    

    return ulPktSize;
}

BOOL  
CPurePlaySource::MakeBye(REF(IHXBuffer*) pPkt)
{
    pPkt = new CHXBuffer();
    if (!pPkt)
    {
      return FALSE;
    }
    pPkt->AddRef();

    
    RTCPPacket pktRR;
    if (HXR_OK != MakeRR(pktRR))
    {
      HX_RELEASE(pPkt);
      return FALSE;
    }
    
    RTCPPacket pktBye; 

    pktBye.version_flag = 0x02;        
    pktBye.padding_flag = 0;     
    pktBye.packet_type = RTCP_BYE;   
    pktBye.length = 1;           // len in 32-bits words minus one
    pktBye.count = 1;

    // use access function
    pktBye.SetByeSrc(&m_ulThisSrcID, pktBye.count);
    
    pPkt->SetSize(((pktRR.length + 1) * 4) + ((pktBye.length + 1) * 4));
    
    unsigned char*   pBuf = pPkt->GetBuffer();
    UINT32  ulLenRR;
    UINT32  ulLenBye;

    pBuf = pktRR.pack(pBuf, ulLenRR);
    pBuf = pktBye.pack(pBuf, ulLenBye);

    HX_ASSERT(ulLenRR = (UINT32)(pktRR.length + 1) * 4);
    HX_ASSERT(ulLenBye == (UINT32)(pktBye.length + 1) * 4);        

    return TRUE;
}

void
CPurePlaySource::SendBye(void)
{
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "\tSendBye()\n");
          fflush(m_pLogFile);
      }         
#endif

    if (!m_bRTCPSent)
    {
      // if no RTCP packet has been sent, don't send the BYE pkt.
      goto bail;
    }

    HX_ASSERT(m_pRTCPInterval);
    if (!m_pRTCPSock)
    {
      // no RTCP packet has been sent
      goto bail;
    }

    if (!m_pByePkt)
    {
      if(!MakeBye(m_pByePkt))
      {
          HX_ASSERT(m_pByePkt);
          goto bail;
      }
    }

    HX_ASSERT(m_pByePkt);

    m_pRTCPSock->WriteTo(m_ulMultiAddr, m_uMultiPort + 1, m_pByePkt);

#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "\tSending()\n");
          fflush(m_pLogFile);
      }         
#endif

bail:

    Cleanup();
}

BOOL
CPurePlaySource::ScheduleBye(void)
{
#ifdef XXXGo_DEBUG
      if (m_pLogFile)
      {
          fprintf(m_pLogFile, "ScheduleBye %u\n", m_SsrcToMember.GetCount() + GetTrueNumSrc());
          fflush(m_pLogFile);
      }         
#endif

    if (!m_pRTCPInterval)
    {
      HX_ASSERT(FALSE);
      return FALSE;
    }

    if (m_pRTCPInterval->IsInitial() || NULL == m_pRTCPSock)
    {
      // no RTCP packet has been sent
      return FALSE;
    }
   
    // if there are more than 50 participants, don't send BYE pkt right away 
    // since doing so may flood the network.
    if (m_SsrcToMember.GetCount() + GetTrueNumSrc() < 50)
    {
      // just send it
      SendBye();
      return TRUE;
    }

    if (!m_pScheduler)
    {
      return FALSE;
    }

    if (!m_pByePkt && !MakeBye(m_pByePkt))         
    {
      HX_ASSERT(!m_pByePkt);  
      return FALSE;
    }
    HX_ASSERT(m_pByePkt);
    
    // we have to do some work.
    UINT32 ulCurTime = GetCurrentSchedulerTimeInMsec(m_pScheduler);
    
    m_pRTCPInterval->SetLastRTCPTime(ulCurTime);
    m_pRTCPInterval->SetCurNumMembers(1);
    m_pRTCPInterval->SetLastNumMembers(1);
    m_pRTCPInterval->SetInitial(TRUE);
    m_pRTCPInterval->SetNumSenders(1);
    m_pRTCPInterval->SetAvgRtcpSize(m_pByePkt->GetSize());
    
    Schedule((UINT32)(m_pRTCPInterval->GetRTCPInterval(0.0, 1, 1) * 1000.0), 
           RTCP_BYE);

    return TRUE;       
}


BOOL
CPurePlaySource::GetStreamBySSRC(UINT32 ulSsrc, CStream*& pStrm)
{
    // get the right source from ssrc
    if (m_SsrcToStream.Lookup(ulSsrc, (void*&)pStrm))
    {
        // it's all good
        HX_ASSERT(ulSsrc == pStrm->m_ulSSRC);
        return TRUE;
    }

    pStrm = NULL; 
    return FALSE;
}

BOOL
CPurePlaySource::GetMember(UINT32 ulSsrc, CMember*& pMember)
{
    // get the right source from ssrc
    if (m_SsrcToMember.Lookup(ulSsrc, (void*&)pMember))
    {
        // it's all good
        HX_ASSERT(ulSsrc == pMember->m_ulSSRC);
        return TRUE;
    }
    else
    {
        pMember = NULL; 
        return FALSE;
    }
}

BOOL
CPurePlaySource::RemoveMember(UINT32 ulSsrc, BOOL bRemoveFromBin)
{
    BOOL bTheErr = FALSE;
    CMember*       pMember = NULL;
    if (m_SsrcToMember.Lookup(ulSsrc, (void*&)pMember))
    {
      HX_ASSERT(ulSsrc == pMember->m_ulSSRC);

      if (bRemoveFromBin)
      {
          // remove this entry from bin.
          m_MemberTimeoutBins.RemoveEntry(pMember->m_binInfo);
      }         

      bTheErr = m_SsrcToMember.RemoveKey((ULONG32)ulSsrc);
      HX_DELETE(pMember);
    }
    return bTheErr;
}


/*
*   There are two maps that we have to take care of SSRC map and ID map
*   (i.e. m_SsrcToStream && m_StrmIdToStrm)
*   SSRC map:
*   - In any case, remove this key from SSRC map.
* 
*   ID map:
*   - If it didn't make the source admission cut, this source is not in this 
*     map.  Just delete the obj
*   - If it did make the cut:
*     - !(RA && WantClientStats) -> remove this key and delete obj
*     - RA && WantClientStats -> this is the only case in which we want to keep
*     this obj around.  SO leave this key in ID map, and don't delete it
*/
BOOL
CPurePlaySource::RemoveSource(UINT32 ulSsrc)
{
    BOOL bTheErr = FALSE;
    CStream* pStrm = NULL;
    if (m_SsrcToStream.Lookup(ulSsrc, (void*&)pStrm))
    { 
      HX_ASSERT(pStrm->m_ulSSRC == ulSsrc);

      // first, remove this key from SSRC map
      bTheErr = m_SsrcToStream.RemoveKey((LONG32)ulSsrc);

      if (pStrm->m_bMadeCut)
      {
          // m_unStreamNum should be correct
          UINT16 nStrmNum = pStrm->m_unStreamNum;
          
          if (!(m_pPlugin->WantClientStats() &&
                m_pPlugin->IsRAStream(nStrmNum)))
          {
            bTheErr = m_StrmIdToStrm.RemoveKey(nStrmNum);
            HX_DELETE(pStrm);
          }
          
          // this stream is done
          m_pPlugin->StreamDone(HXR_OK, nStrmNum);        
      }
      else
      {
#ifdef _DEBUG
          // make sure this obj is not in ID map
          for (CHXMapLongToObj::Iterator i  = m_StrmIdToStrm.Begin();
             i != m_StrmIdToStrm.End();
               ++i)
          {
            if (((CStream*)(*i)) == pStrm)
            {
                // this can't happen
                HX_ASSERT(!"Source obj which didnt make the cut is in ID map");
            }
          }
          
#endif
          HX_DELETE(pStrm);
      }
    } 

    return bTheErr;
}

BOOL
CPurePlaySource::AddNewEntryToMap(UINT32 ulSsrc, CStream*& pStrm)
{
    BOOL bMadeCut;
    BOOL bAddAsMasterSync = FALSE;

#ifdef XXXGo_DEBUG
    if (m_pLogFile)
    {
      fprintf(m_pLogFile, "NewSrc #%u\n", ulSsrc);
      fflush(m_pLogFile);
    }    
    if (m_pPlugin->IsRMPresentation())
    {
      if (GetTrueNumSrc() > 1)
      {
          // if this is RM presentation, there shouldn't be more
          // than one stream here!
          HX_ASSERT(FALSE);
      }
    }
#endif

    bMadeCut = (!m_pPlugin->IsSourceAdmissionClosed());

#ifdef _USE_MASTER_STREAM_SYNC_SCHEME
    bAddAsMasterSync = (bMadeCut &&
                    (m_StreamType == CStream::STREAM_TYPE_AUDIO) &&
                    (!m_pPlugin->HasMasterSyncStream()));
#endif      // _USE_MASTER_STREAM_SYNC_SCHEME

    pStrm = new CStream(this, 
                  ulSsrc, 
                  0, 
                  m_ulHXFactor,
                  m_ulRTPFactor,
                  m_pPlugin->m_pClassFactory,
                  bAddAsMasterSync); 
                              
    if (!pStrm)
    {
        return FALSE;
    }

    pStrm->m_bMadeCut = bMadeCut;
    if (bMadeCut)
    {
      if (m_bStartTimeDistributed)
      {
          pStrm->m_Syncer.SetStartTime(m_ulStartTime, TRUE);
      }
      if (m_bSyncAnchorDistributed)
      {
          pStrm->m_Syncer.AnchorSync(m_ulHXAnchorTime,
                               m_ulNTPHXTime);
      }
      if (m_bSyncDistributed)
      {
          pStrm->m_Syncer.HandleMasterSync(m_ulHXMasterTime,
                                   m_lHXOffsetToMaster);
      }
    }

    m_SsrcToStream.SetAt(ulSsrc, (void*)pStrm);

    if (bAddAsMasterSync)
    {
      m_pPlugin->SetMasterSyncStream(pStrm);
    }
      
    return TRUE;
}

BOOL
CPurePlaySource::AddNewMember(UINT32 ulSsrc, CMember*& pMember)
{

    pMember = new CMember((CPurePlaySource*)this, ulSsrc);
    if (!pMember)
    {
      return FALSE;
    }

    m_SsrcToMember.SetAt(ulSsrc, (void*)pMember);

#ifdef XXXGo_DEBUG
    if (m_pLogFile)
    {
      fprintf(m_pLogFile, "NewMember #%u\n", m_SsrcToMember.GetCount());
      fflush(m_pLogFile);
    }
#endif    

    return TRUE;
}

// We have received RTCP_BYE, set Min Window Size to be 0 in order 
// to play all packets in the buffer
BOOL
CPurePlaySource::MarkAsEnd(UINT32 ulSsrc)
{
    CStream* pStrm = NULL;

    if (m_SsrcToStream.Lookup(ulSsrc, (void*&)pStrm))
    { 
      HX_ASSERT(pStrm->m_ulSSRC == ulSsrc);

      //  do this only if this one made the cut
      if (pStrm->m_bMadeCut)
      {
          // MarkAsEnd() will set Min Window Size to be 0 which will make 
          // all packets available.
          pStrm->MarkAsEnd();
          
          // we could be waitting for more packet.
          m_pPlugin->ProcessPendingPackets();
          return TRUE;
      }
    } 

    // not a sender, or didn't make the cut
    return FALSE;
}


UINT16
CPurePlaySource::SetBaseStreamNum(UINT16 unBaseStreamNum)
{
    // asign stream numbers to each source.
    for (CHXMapLongToObj::Iterator i  = m_SsrcToStream.Begin();
             i != m_SsrcToStream.End();
           ++i)
    {
        ((CStream*)(*i))->m_unStreamNum = unBaseStreamNum;
        m_StrmIdToStrm.SetAt(unBaseStreamNum, (void*)((CStream*)(*i)));

#ifdef XXXGo_DEBUG
      CStream* pStrm;
      pStrm = GetStreamByStrmId(unBaseStreamNum);
      if (unBaseStreamNum != pStrm->m_unStreamNum)
      {
          HX_ASSERT(FALSE);
      }
#endif

      unBaseStreamNum++;
    }

    return unBaseStreamNum;
}


CStream*
CPurePlaySource::GetStreamByStrmId(UINT32 ulId)
{
     CStream* pStrm;
    // get the right source from ssrc
    if (m_StrmIdToStrm.Lookup(ulId, (void*&)pStrm))
    {
        HX_ASSERT(ulId == pStrm->m_unStreamNum);
        return pStrm;
    }
    else
    {
        return NULL;
    }

}


/*
* Sync streams in this IP
*/
HX_RESULT
CPurePlaySource::StopBuffering()
{
    for (CHXMapLongToObj::Iterator i  = m_SsrcToStream.Begin();
             i != m_SsrcToStream.End();
           ++i)
    {
      ((CStream*)(*i))->SetMinTransportWindowSize(1);
    }
        
    return HXR_OK;
}


HX_RESULT
CPurePlaySource::MakeSsrc()
{
    // need a random number or SSRC
    // default constractor will set a seed using HX_GET_TICKCOUNT()
    CMultiplePrimeRandom randGen;
    m_ulThisSrcID = randGen.GetRandomNumber();
    
    // make sure nobody is using this id
    CStream* pStrm = NULL;
    CMember* pMember = NULL;
    while (GetStreamBySSRC(m_ulThisSrcID, pStrm) || GetMember(m_ulThisSrcID, pMember))
    {
      // oops!
      CMultiplePrimeRandom randGen;
      m_ulThisSrcID = randGen.GetRandomNumber();
    }

    HX_ASSERT(!GetStreamBySSRC(m_ulThisSrcID, pStrm));
    HX_ASSERT(!GetMember(m_ulThisSrcID, pMember));

    return HXR_OK;
}


/******************************************************************
*  CPurePlaySource::RuleInfo
*
*/

CPurePlaySource::RuleInfo::RuleInfo()
: m_bRuleOn(FALSE)
, m_bSyncOK(FALSE)
, m_bRecvOn(FALSE)
, m_pOnDepends(NULL)
{
}

CPurePlaySource::RuleInfo::~RuleInfo()
{
    if (m_pOnDepends)
      delete [] m_pOnDepends;
}


/////////////////////////////////////////////////////////////////////////////
// CPurePlaySource::CRTPResponseHandler
//

STDMETHODIMP_(UINT32)
CPurePlaySource::CRTPResponseHandler::AddRef(void)
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
CPurePlaySource::CRTPResponseHandler::Release(void)
{
    if (InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    
    delete this;
    return 0;
}

STDMETHODIMP
CPurePlaySource::CRTPResponseHandler::QueryInterface
(
 REFIID interfaceID,
 void** ppInterfaceObj
 )
{
    // By definition all COM objects support the IUnknown interface
    if (IsEqualIID(interfaceID, IID_IUnknown))
    {
      AddRef();
      *ppInterfaceObj = (IUnknown*)this;
      return HXR_OK;
    }
    else if (IsEqualIID(interfaceID, IID_IHXUDPResponse))
    {
      AddRef();
      *ppInterfaceObj = (IHXUDPResponse*)this;
      return HXR_OK;
    }
    
    // No other interfaces are supported
    *ppInterfaceObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP
CPurePlaySource::CRTPResponseHandler::ReadDone
(
 HX_RESULT  status,
 IHXBuffer* pBuffer,
 ULONG32    ulAddr,
 UINT16     nPort
 )
{
    HX_RESULT     theErr = HXR_FAIL;
    
    if (m_portType == RTP_PORT)
      theErr = m_pStream->HandleRTPMsg(status, pBuffer);
    else if (m_portType == RTCP_PORT)
      theErr = m_pStream->HandleRTCPMsg(status, pBuffer);
    
    return theErr;
}


/////////////////////////////////////////////////////////////////////////////
// CPurePlaySource::CRTCPSendStatusCallback
//
CPurePlaySource::CRTCPSendStatusCallback::~CRTCPSendStatusCallback()
{
    // XXXGo
    // when a user close the player without hitting stop first, 
    // a scheduler will release all the pending callbacks.
    // If this happens, Func() never gets called and BYE pkt
    // never be sent.  Although we can simply send BYE pkt
    // from here, that might flood the network. 
    // since to avoid the flooding of network is the primary
    // purpose of using "reverse consideration", doing so is
    // not desirable.  
    // Therefore, we don't send BYE pkt if that happens.

/*
    if (m_pStream)
    {
      m_pStream->m_hStatusCallbackID = 0;
      m_pStream->OnExpire(m_lEvent);
    } 
*/    
}

STDMETHODIMP_(UINT32)
CPurePlaySource::CRTCPSendStatusCallback::AddRef(void)
{
    return InterlockedIncrement(&m_lRefCount);
}

STDMETHODIMP_(UINT32)
CPurePlaySource::CRTCPSendStatusCallback::Release(void)
{
    if (InterlockedDecrement(&m_lRefCount) > 0)
    {
      return m_lRefCount;
    }
    
    delete this;
    return 0;
}

STDMETHODIMP
CPurePlaySource::CRTCPSendStatusCallback::QueryInterface
(
 REFIID interfaceID,
 void** ppInterfaceObj
 )
{
    // By definition all COM objects support the IUnknown interface
    if (IsEqualIID(interfaceID, IID_IUnknown))
    {
      AddRef();
      *ppInterfaceObj = (IUnknown*)this;
      return HXR_OK;
    }
    else if (IsEqualIID(interfaceID, IID_IHXCallback))
    {
      AddRef();
      *ppInterfaceObj = (IHXCallback*)this;
      return HXR_OK;
    }
    
    // No other interfaces are supported
    *ppInterfaceObj = NULL;
    return HXR_NOINTERFACE;
}

STDMETHODIMP
CPurePlaySource::CRTCPSendStatusCallback::Func()
{
    if (m_pStream)
    {
      m_pStream->m_hStatusCallbackID = 0;
      m_pStream->OnExpire(m_lEvent);
    }
    return HXR_OK;
}



Generated by  Doxygen 1.6.0   Back to index