Changeset 2318

Show
Ignore:
Timestamp:
03/10/2008 23:22:36 (2 years ago)
Author:
chris
Message:

Remove Win32 command socket thread, as it has caused too much trouble.

Handle command socket on Win32 the same as all other platforms, removing
#ifdefs from BackupDaemon?.

Will replace this thread with regular but not excessive command socket
polling using timers in future.

Change error messages when command socket comms fail to make them clearer.

Location:
box/trunk
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • box/trunk/bin/bbackupd/BackupDaemon.cpp

    r2302 r2318  
    9393#define         SYNC_PERIOD_RANDOM_EXTRA_TIME_SHIFT_BY  6 
    9494 
    95 #ifdef WIN32 
    96 // -------------------------------------------------------------------------- 
    97 // 
    98 // Function 
    99 //              Name:    HelperThread() 
    100 //              Purpose: Background thread function, called by Windows, 
    101 //                      calls the BackupDaemon's RunHelperThread method 
    102 //                      to listen for and act on control communications 
    103 //              Created: 18/2/04 
    104 // 
    105 // -------------------------------------------------------------------------- 
    106 unsigned int WINAPI HelperThread(LPVOID lpParam)  
    107 {  
    108         ((BackupDaemon *)lpParam)->RunHelperThread(); 
    109  
    110         return 0; 
    111 } 
    112 #endif 
    113  
    11495// -------------------------------------------------------------------------- 
    11596// 
     
    123104        : mState(BackupDaemon::State_Initialising), 
    124105          mDeleteRedundantLocationsAfter(0), 
    125           mpCommandSocketInfo(0), 
    126106          mLastNotifiedEvent(SysadminNotifier::MAX), 
    127107          mDeleteUnusedRootDirEntriesAfter(0), 
     
    149129        // Only ever one instance of a daemon 
    150130        SSLLib::Initialise(); 
    151          
    152         #ifdef WIN32 
    153                 // Create the event object to signal from main thread to 
    154                 // worker when new messages are queued to be sent to the 
    155                 // command socket. 
    156  
    157                 mhMessageToSendEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 
    158                 if(mhMessageToSendEvent == INVALID_HANDLE_VALUE) 
    159                 { 
    160                         BOX_ERROR("Failed to create event object: error " << 
    161                                 GetLastError()); 
    162                         exit(1); 
    163                 } 
    164  
    165                 // Create the event object to signal from worker to main thread 
    166                 // when a command has been received on the command socket. 
    167  
    168                 mhCommandReceivedEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 
    169                 if(mhCommandReceivedEvent == INVALID_HANDLE_VALUE) 
    170                 { 
    171                         BOX_ERROR("Failed to create event object: error " << 
    172                                 GetLastError()); 
    173                         exit(1); 
    174                 } 
    175  
    176                 // Create the critical section to protect the message queue 
    177                 InitializeCriticalSection(&mMessageQueueLock); 
    178         #endif 
    179131} 
    180132 
     
    191143        DeleteAllLocations(); 
    192144        DeleteAllIDMaps(); 
    193          
    194         if(mpCommandSocketInfo != 0) 
    195         { 
    196                 delete mpCommandSocketInfo; 
    197                 mpCommandSocketInfo = 0; 
    198         } 
    199145} 
    200146 
     
    385331        return returnCode; 
    386332} 
    387  
    388 void BackupDaemon::RunHelperThread(void) 
    389 { 
    390         const Configuration &conf(GetConfiguration()); 
    391         mpCommandSocketInfo = new CommandSocketInfo; 
    392         WinNamedPipeStream& rSocket(mpCommandSocketInfo->mListeningSocket); 
    393  
    394         // loop until the parent process exits, or we decide 
    395         // to kill the thread ourselves 
    396         while (!IsTerminateWanted()) 
    397         { 
    398                 try 
    399                 { 
    400                         std::string socket = conf.GetKeyValue("CommandSocket"); 
    401                         rSocket.Accept(socket); 
    402                 } 
    403                 catch (BoxException &e) 
    404                 { 
    405                         BOX_ERROR("Failed to open command socket: " <<  
    406                                 e.what()); 
    407                         SetTerminateWanted(); 
    408                         break; // this is fatal to listening thread 
    409                 } 
    410                 catch(std::exception &e) 
    411                 { 
    412                         BOX_ERROR("Failed to open command socket: " << 
    413                                 e.what()); 
    414                         SetTerminateWanted(); 
    415                         break; // this is fatal to listening thread 
    416                 } 
    417                 catch(...) 
    418                 { 
    419                         BOX_ERROR("Failed to open command socket: " 
    420                                 "unknown error"); 
    421                         SetTerminateWanted(); 
    422                         break; // this is fatal to listening thread 
    423                 } 
    424  
    425                 try 
    426                 { 
    427                         // Errors here do not kill the thread, 
    428                         // only the current connection. 
    429  
    430                         // This next section comes from Ben's original function 
    431                         // Log 
    432                         BOX_INFO("Connection from command socket"); 
    433  
    434                         // Send a header line summarising the configuration  
    435                         // and current state 
    436                         char summary[256]; 
    437                         size_t summarySize = sprintf(summary,  
    438                                 "bbackupd: %d %d %d %d\nstate %d\n", 
    439                                 conf.GetKeyValueBool("AutomaticBackup"), 
    440                                 conf.GetKeyValueInt("UpdateStoreInterval"), 
    441                                 conf.GetKeyValueInt("MinimumFileAge"), 
    442                                 conf.GetKeyValueInt("MaxUploadWait"), 
    443                                 mState); 
    444  
    445                         rSocket.Write(summary, summarySize); 
    446                         rSocket.Write("ping\n", 5); 
    447  
    448                         // old queued messages are not useful 
    449                         EnterCriticalSection(&mMessageQueueLock); 
    450                         mMessageList.clear(); 
    451                         ResetEvent(mhMessageToSendEvent); 
    452                         LeaveCriticalSection(&mMessageQueueLock); 
    453  
    454                         IOStreamGetLine readLine(rSocket); 
    455                         std::string command; 
    456  
    457                         while (rSocket.IsConnected() && !IsTerminateWanted()) 
    458                         { 
    459                                 HANDLE handles[2]; 
    460                                 handles[0] = mhMessageToSendEvent; 
    461                                 handles[1] = rSocket.GetReadableEvent(); 
    462                                  
    463                                 DWORD result = WaitForMultipleObjects( 
    464                                         sizeof(handles)/sizeof(*handles), 
    465                                         handles, FALSE, 1000); 
    466  
    467                                 if(result == 0) 
    468                                 { 
    469                                         ResetEvent(mhMessageToSendEvent); 
    470  
    471                                         EnterCriticalSection(&mMessageQueueLock); 
    472                                         try 
    473                                         { 
    474                                                 while (mMessageList.size() > 0) 
    475                                                 { 
    476                                                         std::string message = *(mMessageList.begin()); 
    477                                                         mMessageList.erase(mMessageList.begin()); 
    478                                                         BOX_TRACE("Sending '" << message << "' to waiting client"); 
    479                                                         message += "\n"; 
    480                                                         rSocket.Write(message.c_str(), 
    481                                                                 message.length()); 
    482                                                 } 
    483                                         } 
    484                                         catch (...) 
    485                                         { 
    486                                                 LeaveCriticalSection(&mMessageQueueLock); 
    487                                                 throw; 
    488                                         } 
    489                                         LeaveCriticalSection(&mMessageQueueLock); 
    490                                         continue; 
    491                                 } 
    492                                 else if(result == WAIT_TIMEOUT) 
    493                                 { 
    494                                         continue; 
    495                                 } 
    496                                 else if(result != 1) 
    497                                 { 
    498                                         BOX_ERROR("WaitForMultipleObjects " 
    499                                                 "returned invalid result " << 
    500                                                 result); 
    501                                         continue; 
    502                                 } 
    503  
    504                                 if(!readLine.GetLine(command)) 
    505                                 { 
    506                                         BOX_ERROR("Failed to read line"); 
    507                                         continue; 
    508                                 } 
    509  
    510                                 BOX_INFO("Received command '" << command <<  
    511                                         "' over command socket"); 
    512  
    513                                 bool sendOK = false; 
    514                                 bool sendResponse = true; 
    515                                 bool disconnect = false; 
    516  
    517                                 // Command to process! 
    518                                 if(command == "quit" || command == "") 
    519                                 { 
    520                                         // Close the socket. 
    521                                         disconnect = true; 
    522                                         sendResponse = false; 
    523                                 } 
    524                                 else if(command == "sync") 
    525                                 { 
    526                                         // Sync now! 
    527                                         this->mDoSyncFlagOut = true; 
    528                                         this->mSyncIsForcedOut = false; 
    529                                         sendOK = true; 
    530                                         SetEvent(mhCommandReceivedEvent); 
    531                                 } 
    532                                 else if(command == "force-sync") 
    533                                 { 
    534                                         // Sync now (forced -- overrides any SyncAllowScript) 
    535                                         this->mDoSyncFlagOut = true; 
    536                                         this->mSyncIsForcedOut = true; 
    537                                         sendOK = true; 
    538                                         SetEvent(mhCommandReceivedEvent); 
    539                                 } 
    540                                 else if(command == "reload") 
    541                                 { 
    542                                         // Reload the configuration 
    543                                         SetReloadConfigWanted(); 
    544                                         sendOK = true; 
    545                                         SetEvent(mhCommandReceivedEvent); 
    546                                 } 
    547                                 else if(command == "terminate") 
    548                                 { 
    549                                         // Terminate the daemon cleanly 
    550                                         SetTerminateWanted(); 
    551                                         sendOK = true; 
    552                                         SetEvent(mhCommandReceivedEvent); 
    553                                 } 
    554                                 else 
    555                                 { 
    556                                         BOX_ERROR("Received unknown command " 
    557                                                 "'" << command << "' " 
    558                                                 "over command socket"); 
    559                                         sendResponse = true; 
    560                                         sendOK = false; 
    561                                 } 
    562  
    563                                 // Send a response back? 
    564                                 if(sendResponse) 
    565                                 { 
    566                                         const char* response = 
    567                                                 sendOK ? "ok\n" : "error\n"; 
    568                                         rSocket.Write(response, 
    569                                                 strlen(response)); 
    570                                 } 
    571  
    572                                 if(disconnect)  
    573                                 { 
    574                                         break; 
    575                                 } 
    576                         } 
    577  
    578                         rSocket.Close(); 
    579                 } 
    580                 catch(BoxException &e) 
    581                 { 
    582                         BOX_ERROR("Communication error with " 
    583                                 "control client: " << e.what()); 
    584                 } 
    585                 catch(std::exception &e) 
    586                 { 
    587                         BOX_ERROR("Internal error in command socket " 
    588                                 "thread: " << e.what()); 
    589                 } 
    590                 catch(...) 
    591                 { 
    592                         BOX_ERROR("Communication error with control client"); 
    593                 } 
    594         } 
    595  
    596         CloseHandle(mhCommandReceivedEvent); 
    597         CloseHandle(mhMessageToSendEvent); 
    598 }  
    599333#endif 
    600334 
     
    612346        Timers::Init(); 
    613347         
    614         #ifdef WIN32 
    615                 // Create a thread to handle the named pipe 
    616                 HANDLE hThread; 
    617                 unsigned int dwThreadId; 
    618  
    619                 hThread = (HANDLE) _beginthreadex(  
    620                         NULL,         // default security attributes  
    621                         0,            // use default stack size   
    622                         HelperThread, // thread function  
    623                         this,         // argument to thread function  
    624                         0,            // use default creation flags  
    625                         &dwThreadId); // returns the thread identifier  
    626         #else 
     348        #ifndef WIN32 
    627349                // Ignore SIGPIPE so that if a command connection is broken, 
    628350                // the daemon doesn't terminate. 
    629351                ::signal(SIGPIPE, SIG_IGN); 
    630  
    631                 // Create a command socket? 
    632                 const Configuration &conf(GetConfiguration()); 
    633                 if(conf.KeyExists("CommandSocket")) 
    634                 { 
    635                         // Yes, create a local UNIX socket 
    636                         mpCommandSocketInfo = new CommandSocketInfo; 
    637                         const char *socketName = 
    638                                 conf.GetKeyValue("CommandSocket").c_str(); 
     352        #endif 
     353 
     354        // Create a command socket? 
     355        const Configuration &conf(GetConfiguration()); 
     356        if(conf.KeyExists("CommandSocket")) 
     357        { 
     358                // Yes, create a local UNIX socket 
     359                mapCommandSocketInfo.reset(new CommandSocketInfo); 
     360                const char *socketName = 
     361                        conf.GetKeyValue("CommandSocket").c_str(); 
     362                #ifdef WIN32 
     363                        mapCommandSocketInfo->mListeningSocket.Listen( 
     364                                socketName); 
     365                #else 
    639366                        ::unlink(socketName); 
    640                         mpCommandSocketInfo->mListeningSocket.Listen( 
     367                        mapCommandSocketInfo->mListeningSocket.Listen( 
    641368                                Socket::TypeUNIX, socketName); 
    642                 } 
    643         #endif // !WIN32 
     369                #endif 
     370        } 
    644371 
    645372        // Handle things nicely on exceptions 
     
    650377        catch(...) 
    651378        { 
    652                 #ifdef WIN32 
    653                         // Don't delete the socket, as the helper thread 
    654                         // is probably still using it. Let Windows clean 
    655                         // up after us. 
    656                 #else 
    657                 if(mpCommandSocketInfo != 0) 
     379                if(mapCommandSocketInfo.get()) 
    658380                { 
    659381                        try  
    660382                        { 
    661                                 delete mpCommandSocketInfo; 
     383                                mapCommandSocketInfo.reset(); 
    662384                        } 
    663385                        catch(std::exception &e) 
     
    672394                                        "after exception, ignored."); 
    673395                        } 
    674                         mpCommandSocketInfo = 0; 
    675                 } 
    676                 #endif // WIN32 
     396                } 
    677397 
    678398                Timers::Cleanup(); 
     
    681401        } 
    682402 
    683         #ifndef WIN32 
    684                 // Clean up 
    685                 if(mpCommandSocketInfo != 0) 
    686                 { 
    687                         delete mpCommandSocketInfo; 
    688                         mpCommandSocketInfo = 0; 
    689                 } 
    690         #endif 
    691          
     403        // Clean up 
     404        mapCommandSocketInfo.reset(); 
    692405        Timers::Cleanup(); 
    693406} 
     
    793506                                // depending on the state of the  
    794507                                // control connection 
    795                                 if(mpCommandSocketInfo != 0) 
     508                                if(mapCommandSocketInfo.get() != 0) 
    796509                                { 
    797510                                        // A command socket exists,  
     
    881594        catch(std::exception &e) 
    882595        { 
    883                 BOX_ERROR("Internal error during " 
    884                         "backup run: " << e.what()); 
     596                BOX_ERROR("Internal error during backup run: " << e.what()); 
    885597                errorOccurred = true; 
    886598                errorString = e.what(); 
     
    13841096void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFlagOut, bool &SyncIsForcedOut) 
    13851097{ 
    1386 #ifdef WIN32 
    1387         DWORD requiredDelayMs = BoxTimeToMilliSeconds(RequiredDelay); 
    1388  
    1389         DWORD result = WaitForSingleObject(mhCommandReceivedEvent,  
    1390                 (DWORD)requiredDelayMs); 
    1391  
    1392         if(result == WAIT_OBJECT_0) 
    1393         { 
    1394                 DoSyncFlagOut = this->mDoSyncFlagOut; 
    1395                 SyncIsForcedOut = this->mSyncIsForcedOut; 
    1396                 ResetEvent(mhCommandReceivedEvent); 
    1397         } 
    1398         else if(result == WAIT_TIMEOUT) 
    1399         { 
    1400                 DoSyncFlagOut = false; 
    1401                 SyncIsForcedOut = false; 
    1402         } 
    1403         else 
    1404         { 
    1405                 BOX_ERROR("Unexpected result from WaitForSingleObject: " 
    1406                         "error " << GetLastError()); 
    1407         } 
    1408  
    1409         return; 
    1410 #else // ! WIN32 
    1411         ASSERT(mpCommandSocketInfo != 0); 
    1412         if(mpCommandSocketInfo == 0) {::sleep(1); return;} // failure case isn't too bad 
     1098        ASSERT(mapCommandSocketInfo.get()); 
     1099        if(!mapCommandSocketInfo.get()) 
     1100        { 
     1101                // failure case isn't too bad 
     1102                ::sleep(1); 
     1103                return; 
     1104        } 
    14131105         
    14141106        BOX_TRACE("Wait on command socket, delay = " << RequiredDelay); 
     
    14231115 
    14241116                // Wait for socket connection, or handle a command? 
    1425                 if(mpCommandSocketInfo->mpConnectedSocket.get() == 0) 
     1117                if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
    14261118                { 
    14271119                        // No connection, listen for a new one 
    1428                         mpCommandSocketInfo->mpConnectedSocket.reset(mpCommandSocketInfo->mListeningSocket.Accept(timeout).release()); 
     1120                        mapCommandSocketInfo->mpConnectedSocket.reset(mapCommandSocketInfo->mListeningSocket.Accept(timeout).release()); 
    14291121                         
    1430                         if(mpCommandSocketInfo->mpConnectedSocket.get() == 0) 
     1122                        if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
    14311123                        { 
    14321124                                // If a connection didn't arrive, there was a timeout, which means we've 
     
    14471139                                        uid_t remoteEUID = 0xffff; 
    14481140                                        gid_t remoteEGID = 0xffff; 
    1449                                         if(mpCommandSocketInfo->mpConnectedSocket->GetPeerCredentials(remoteEUID, remoteEGID)) 
     1141                                        if(mapCommandSocketInfo->mpConnectedSocket->GetPeerCredentials(remoteEUID, remoteEGID)) 
    14501142                                        { 
    14511143                                                // Credentials are available -- check UID 
     
    14641156                                        // Dump the connection 
    14651157                                        BOX_ERROR("Incoming command connection from peer had different user ID than this process, or security check could not be completed."); 
    1466                                         mpCommandSocketInfo->mpConnectedSocket.reset(); 
     1158                                        mapCommandSocketInfo->mpConnectedSocket.reset(); 
    14671159                                        return; 
    14681160                                } 
     
    14811173                                                conf.GetKeyValueInt("MaxUploadWait"), 
    14821174                                                mState); 
    1483                                         mpCommandSocketInfo->mpConnectedSocket->Write(summary, summarySize); 
     1175                                        mapCommandSocketInfo->mpConnectedSocket->Write(summary, summarySize); 
    14841176                                         
    14851177                                        // Set the timeout to something very small, so we don't wait too long on waiting 
     
    14911183 
    14921184                // So there must be a connection now. 
    1493                 ASSERT(mpCommandSocketInfo->mpConnectedSocket.get() != 0); 
     1185                ASSERT(mapCommandSocketInfo->mpConnectedSocket.get() != 0); 
    14941186                 
    14951187                // Is there a getline object ready? 
    1496                 if(mpCommandSocketInfo->mpGetLine == 0) 
     1188                if(mapCommandSocketInfo->mpGetLine == 0) 
    14971189                { 
    14981190                        // Create a new one 
    1499                         mpCommandSocketInfo->mpGetLine = new IOStreamGetLine(*(mpCommandSocketInfo->mpConnectedSocket.get())); 
     1191                        mapCommandSocketInfo->mpGetLine = new IOStreamGetLine(*(mapCommandSocketInfo->mpConnectedSocket.get())); 
    15001192                } 
    15011193                 
    15021194                // Ping the remote side, to provide errors which will mean the socket gets closed 
    1503                 mpCommandSocketInfo->mpConnectedSocket->Write("ping\n", 5); 
     1195                mapCommandSocketInfo->mpConnectedSocket->Write("ping\n", 5); 
    15041196                 
    15051197                // Wait for a command or something on the socket 
    15061198                std::string command; 
    1507                 while(mpCommandSocketInfo->mpGetLine != 0 && !mpCommandSocketInfo->mpGetLine->IsEOF() 
    1508                         && mpCommandSocketInfo->mpGetLine->GetLine(command, false /* no preprocessing */, timeout)) 
     1199                while(mapCommandSocketInfo->mpGetLine != 0 && !mapCommandSocketInfo->mpGetLine->IsEOF() 
     1200                        && mapCommandSocketInfo->mpGetLine->GetLine(command, false /* no preprocessing */, timeout)) 
    15091201                { 
    15101202                        BOX_TRACE("Receiving command '" << command  
     
    15511243                        if(sendResponse) 
    15521244                        { 
    1553                                 mpCommandSocketInfo->mpConnectedSocket->Write(sendOK?"ok\n":"error\n", sendOK?3:6); 
     1245                                mapCommandSocketInfo->mpConnectedSocket->Write(sendOK?"ok\n":"error\n", sendOK?3:6); 
    15541246                        } 
    15551247                         
     
    15591251                 
    15601252                // Close on EOF? 
    1561                 if(mpCommandSocketInfo->mpGetLine != 0 && mpCommandSocketInfo->mpGetLine->IsEOF()) 
     1253                if(mapCommandSocketInfo->mpGetLine != 0 && mapCommandSocketInfo->mpGetLine->IsEOF()) 
    15621254                { 
    15631255                        CloseCommandConnection(); 
    15641256                } 
    15651257        } 
    1566         catch(std::exception &e) 
    1567         { 
    1568                 BOX_ERROR("Internal error in command socket thread: " 
    1569                         << e.what()); 
    1570                 // If an error occurs, and there is a connection active, just close that 
    1571                 // connection and continue. Otherwise, let the error propagate. 
    1572                 if(mpCommandSocketInfo->mpConnectedSocket.get() == 0) 
     1258        catch(ConnectionException &ce) 
     1259        { 
     1260                BOX_NOTICE("Failed to write to command socket: " << ce.what()); 
     1261 
     1262                // If an error occurs, and there is a connection active, 
     1263                // just close that connection and continue. Otherwise, 
     1264                // let the error propagate. 
     1265 
     1266                if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
    15731267                { 
    15741268                        throw; // thread will die 
     
    15801274                } 
    15811275        } 
    1582         catch(...) 
    1583         { 
    1584                 // If an error occurs, and there is a connection active, just close that 
    1585                 // connection and continue. Otherwise, let the error propagate. 
    1586                 if(mpCommandSocketInfo->mpConnectedSocket.get() == 0) 
     1276        catch(std::exception &e) 
     1277        { 
     1278                BOX_ERROR("Failed to write to command socket: " << 
     1279                        e.what()); 
     1280 
     1281                // If an error occurs, and there is a connection active, 
     1282                // just close that connection and continue. Otherwise, 
     1283                // let the error propagate. 
     1284 
     1285                if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
    15871286                { 
    15881287                        throw; // thread will die 
     
    15941293                } 
    15951294        } 
    1596 #endif // WIN32 
     1295        catch(...) 
     1296        { 
     1297                BOX_ERROR("Failed to write to command socket: unknown error"); 
     1298 
     1299                // If an error occurs, and there is a connection active, 
     1300                // just close that connection and continue. Otherwise, 
     1301                // let the error propagate. 
     1302 
     1303                if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
     1304                { 
     1305                        throw; // thread will die 
     1306                } 
     1307                else 
     1308                { 
     1309                        // Close socket and ignore error 
     1310                        CloseCommandConnection(); 
     1311                } 
     1312        } 
    15971313} 
    15981314 
     
    16081324void BackupDaemon::CloseCommandConnection() 
    16091325{ 
    1610 #ifndef WIN32 
    16111326        try 
    16121327        { 
    16131328                BOX_TRACE("Closing command connection"); 
    16141329                 
    1615                 if(mpCommandSocketInfo->mpGetLine) 
    1616                 { 
    1617                         delete mpCommandSocketInfo->mpGetLine; 
    1618                         mpCommandSocketInfo->mpGetLine = 0; 
    1619                 } 
    1620                 mpCommandSocketInfo->mpConnectedSocket.reset(); 
     1330                if(mapCommandSocketInfo->mpGetLine) 
     1331                { 
     1332                        delete mapCommandSocketInfo->mpGetLine; 
     1333                        mapCommandSocketInfo->mpGetLine = 0; 
     1334                } 
     1335                mapCommandSocketInfo->mpConnectedSocket.reset(); 
    16211336        } 
    16221337        catch(std::exception &e) 
     
    16291344                // Ignore any errors 
    16301345        } 
    1631 #endif 
    16321346} 
    16331347 
     
    16471361        // may never change if the server doesn't need to be contacted. 
    16481362 
    1649         if(mpCommandSocketInfo != NULL && 
    1650 #ifdef WIN32 
    1651             mpCommandSocketInfo->mListeningSocket.IsConnected() 
    1652 #else 
    1653             mpCommandSocketInfo->mpConnectedSocket.get() != 0 
    1654 #endif 
    1655             ) 
     1363        if(mapCommandSocketInfo.get() && 
     1364                mapCommandSocketInfo->mpConnectedSocket.get() != 0) 
    16561365        { 
    16571366                std::string message = SendStart ? "start-sync" : "finish-sync"; 
    16581367                try 
    16591368                { 
    1660 #ifdef WIN32 
    1661                         EnterCriticalSection(&mMessageQueueLock); 
    1662                         mMessageList.push_back(message); 
    1663                         SetEvent(mhMessageToSendEvent); 
    1664                         LeaveCriticalSection(&mMessageQueueLock); 
    1665 #else 
    16661369                        message += "\n"; 
    1667                         mpCommandSocketInfo->mpConnectedSocket->Write( 
     1370                        mapCommandSocketInfo->mpConnectedSocket->Write( 
    16681371                                message.c_str(), message.size()); 
    1669 #endif 
    16701372                } 
    16711373                catch(std::exception &e) 
     
    23552057        std::string message = newState; 
    23562058 
    2357 #ifdef WIN32 
    2358         EnterCriticalSection(&mMessageQueueLock); 
    2359         mMessageList.push_back(newState); 
    2360         SetEvent(mhMessageToSendEvent); 
    2361         LeaveCriticalSection(&mMessageQueueLock); 
    2362 #else 
    23632059        message += "\n"; 
    23642060 
    2365         if(mpCommandSocketInfo == 0) 
     2061        if(!mapCommandSocketInfo.get()) 
    23662062        { 
    23672063                return; 
    23682064        } 
    23692065 
    2370         if(mpCommandSocketInfo->mpConnectedSocket.get() == 0) 
     2066        if(mapCommandSocketInfo->mpConnectedSocket.get() == 0) 
    23712067        { 
    23722068                return; 
     
    23762072        try 
    23772073        { 
    2378                 mpCommandSocketInfo->mpConnectedSocket->Write(message.c_str(), 
     2074                mapCommandSocketInfo->mpConnectedSocket->Write(message.c_str(), 
    23792075                        message.length()); 
    23802076        } 
     2077        catch(ConnectionException &ce) 
     2078        { 
     2079                BOX_NOTICE("Failed to write state to command socket: " << 
     2080                        ce.what()); 
     2081                CloseCommandConnection(); 
     2082        } 
    23812083        catch(std::exception &e) 
    23822084        { 
    2383                 BOX_ERROR("Internal error while writing state " 
    2384                         "to command socket: " << e.what()); 
     2085                BOX_ERROR("Failed to write state to command socket: " << 
     2086                        e.what()); 
    23852087                CloseCommandConnection(); 
    23862088        } 
    23872089        catch(...) 
    23882090        { 
    2389                 BOX_ERROR("Internal error while writing state " 
    2390                         "to command socket: unknown error"); 
     2091                BOX_ERROR("Failed to write state to command socket: " 
     2092                        "unknown error"); 
    23912093                CloseCommandConnection(); 
    23922094        } 
    2393 #endif 
    23942095} 
    23952096 
  • box/trunk/bin/bbackupd/BackupDaemon.h

    r2302 r2318  
    2828 
    2929#ifdef WIN32 
     30        #include "WinNamedPipeListener.h" 
    3031        #include "WinNamedPipeStream.h" 
    3132#endif 
     
    194195        public: 
    195196#ifdef WIN32 
    196                 WinNamedPipeStream mListeningSocket; 
     197                WinNamedPipeListener<1 /* listen backlog */> mListeningSocket; 
     198                std::auto_ptr<WinNamedPipeStream> mpConnectedSocket; 
    197199#else 
    198200                SocketListen<SocketStream, 1 /* listen backlog */> mListeningSocket; 
     
    203205         
    204206        // Using a socket? 
    205         CommandSocketInfo *mpCommandSocketInfo; 
     207        std::auto_ptr<CommandSocketInfo> mapCommandSocketInfo; 
    206208         
    207209        // Stop notifications being repeated. 
     
    504506 
    505507#ifdef WIN32 
    506         public: 
    507         void RunHelperThread(void); 
    508  
    509508        private: 
    510         bool mDoSyncFlagOut, mSyncIsForcedOut; 
    511509        bool mInstallService, mRemoveService, mRunAsService; 
    512510        std::string mServiceName; 
    513         HANDLE mhMessageToSendEvent, mhCommandReceivedEvent; 
    514         CRITICAL_SECTION mMessageQueueLock; 
    515         std::vector<std::string> mMessageList; 
    516511#endif 
    517512}; 
  • box/trunk/lib/server/WinNamedPipeStream.cpp

    r2106 r2318  
    4545          mIsServer(false), 
    4646          mIsConnected(false) 
    47 { 
     47{ } 
     48 
     49// -------------------------------------------------------------------------- 
     50// 
     51// Function 
     52//              Name:    WinNamedPipeStream::WinNamedPipeStream(HANDLE) 
     53//              Purpose: Constructor (with already-connected pipe handle) 
     54//              Created: 2008/10/01 
     55// 
     56// -------------------------------------------------------------------------- 
     57WinNamedPipeStream::WinNamedPipeStream(HANDLE hNamedPipe) 
     58        : mSocketHandle(hNamedPipe), 
     59          mReadableEvent(INVALID_HANDLE_VALUE), 
     60          mBytesInBuffer(0), 
     61          mReadClosed(false), 
     62          mWriteClosed(false), 
     63          mIsServer(true), 
     64          mIsConnected(true) 
     65{  
     66        // create the Readable event 
     67        mReadableEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 
     68 
     69        if (mReadableEvent == INVALID_HANDLE_VALUE) 
     70        { 
     71                BOX_ERROR("Failed to create the Readable event: " << 
     72                        GetErrorMessage(GetLastError())); 
     73                Close(); 
     74                THROW_EXCEPTION(CommonException, Internal) 
     75        } 
     76 
     77        // initialise the OVERLAPPED structure 
     78        memset(&mReadOverlap, 0, sizeof(mReadOverlap)); 
     79        mReadOverlap.hEvent = mReadableEvent; 
     80 
     81        // start the first overlapped read 
     82        if (!ReadFile(mSocketHandle, mReadBuffer, sizeof(mReadBuffer), 
     83                NULL, &mReadOverlap)) 
     84        { 
     85                DWORD err = GetLastError(); 
     86 
     87                if (err != ERROR_IO_PENDING) 
     88                { 
     89                        BOX_ERROR("Failed to start overlapped read: " << 
     90                                GetErrorMessage(err)); 
     91                        Close(); 
     92                        THROW_EXCEPTION(ConnectionException,  
     93                                Conn_SocketReadError) 
     94                } 
     95        } 
    4896} 
    4997 
     
    81129// 
    82130// -------------------------------------------------------------------------- 
    83 void WinNamedPipeStream::Accept(const std::string& rName) 
    84 { 
    85         if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected)  
    86         { 
    87                 THROW_EXCEPTION(ServerException, SocketAlreadyOpen) 
    88         } 
    89  
    90         std::string socket = sPipeNamePrefix + rName; 
    91  
    92         mSocketHandle = CreateNamedPipeA(  
    93                 socket.c_str(),            // pipe name  
    94                 PIPE_ACCESS_DUPLEX |       // read/write access  
    95                 FILE_FLAG_OVERLAPPED,      // enabled overlapped I/O 
    96                 PIPE_TYPE_BYTE |           // message type pipe  
    97                 PIPE_READMODE_BYTE |       // message-read mode  
    98                 PIPE_WAIT,                 // blocking mode  
    99                 1,                         // max. instances   
    100                 4096,                      // output buffer size  
    101                 4096,                      // input buffer size  
    102                 NMPWAIT_USE_DEFAULT_WAIT,  // client time-out  
    103                 NULL);                     // default security attribute  
    104  
     131/* 
     132void WinNamedPipeStream::Accept() 
     133{ 
    105134        if (mSocketHandle == INVALID_HANDLE_VALUE) 
    106135        { 
    107                 BOX_ERROR("Failed to CreateNamedPipeA(" << socket << "): " << 
    108                         GetErrorMessage(GetLastError())); 
    109                 THROW_EXCEPTION(ServerException, SocketOpenError) 
     136                THROW_EXCEPTION(ServerException, BadSocketHandle); 
     137        } 
     138 
     139        if (mIsConnected)  
     140        { 
     141                THROW_EXCEPTION(ServerException, SocketAlreadyOpen); 
    110142        } 
    111143 
     
    157189        } 
    158190} 
     191*/ 
    159192 
    160193// -------------------------------------------------------------------------- 
     
    218251{ 
    219252        // TODO no support for timeouts yet 
    220         if (Timeout != IOStream::TimeOutInfinite) 
     253        if (!mIsServer && Timeout != IOStream::TimeOutInfinite) 
    221254        { 
    222255                THROW_EXCEPTION(CommonException, AssertFailed) 
     
    250283                        // overlapped I/O completed successfully?  
    251284                        // (wait if needed) 
    252  
    253                         if (GetOverlappedResult(mSocketHandle, 
     285                        DWORD waitResult = WaitForSingleObject( 
     286                                mReadOverlap.hEvent, Timeout); 
     287 
     288                        if (waitResult == WAIT_ABANDONED) 
     289                        { 
     290                                BOX_ERROR("Wait for command socket read " 
     291                                        "abandoned by system"); 
     292                                THROW_EXCEPTION(ServerException, 
     293                                        BadSocketHandle); 
     294                        } 
     295                        else if (waitResult == WAIT_TIMEOUT) 
     296                        { 
     297                                // wait timed out, nothing to read 
     298                                NumBytesRead = 0; 
     299                        } 
     300                        else if (waitResult != WAIT_OBJECT_0) 
     301                        { 
     302                                BOX_ERROR("Failed to wait for command " 
     303                                        "socket read: unknown result " << 
     304                                        waitResult); 
     305                        } 
     306                        // object is ready to read from 
     307                        else if (GetOverlappedResult(mSocketHandle, 
    254308                                &mReadOverlap, &NumBytesRead, TRUE)) 
    255309                        { 
     
    268322                                        if (err == ERROR_BROKEN_PIPE) 
    269323                                        { 
    270                                                 BOX_ERROR("Control client " 
     324                                                BOX_NOTICE("Control client " 
    271325                                                        "disconnected"); 
    272326                                        } 
     
    343397                        } 
    344398                } 
    345  
    346                 // If the read succeeded immediately, leave the event  
    347                 // signaled, so that we will be called again to process  
    348                 // the newly read data and start another overlapped read. 
    349                 if (needAnotherRead && !mReadClosed) 
    350                 { 
    351                         // leave signalled 
    352                 } 
    353                 else if (!needAnotherRead && mBytesInBuffer > 0) 
    354                 { 
    355                         // leave signalled 
    356                 } 
    357                 else 
    358                 { 
    359                         // nothing left to read, reset the event 
    360                         ResetEvent(mReadableEvent); 
    361                         // FIXME: a pending read could have signalled 
    362                         // the event (again) while we were busy reading. 
    363                         // that signal would be lost, and the reading 
    364                         // thread would block. Should be pretty obvious 
    365                         // if this happens in practice: control client 
    366                         // hangs. 
    367                 } 
    368399        } 
    369400        else 
     
    442473                { 
    443474                        // ERROR_NO_DATA is a strange name for  
    444                         // "The pipe is being closed". No exception wanted. 
     475                        // "The pipe is being closed". 
    445476 
    446477                        DWORD err = GetLastError(); 
     
    454485                        Close(); 
    455486 
    456                         if (err == ERROR_NO_DATA)  
    457                         { 
    458                                 return; 
    459                         } 
    460                         else 
    461                         { 
    462                                 THROW_EXCEPTION(ConnectionException,  
    463                                         Conn_SocketWriteError) 
    464                         } 
     487                        THROW_EXCEPTION(ConnectionException,  
     488                                Conn_SocketWriteError) 
    465489                } 
    466490 
  • box/trunk/lib/server/WinNamedPipeStream.h

    r1877 r2318  
    2525public: 
    2626        WinNamedPipeStream(); 
     27        WinNamedPipeStream(HANDLE hNamedPipe); 
    2728        ~WinNamedPipeStream(); 
    2829 
    2930        // server side - create the named pipe and listen for connections 
    30         void Accept(const std::string& rName); 
     31        // use WinNamedPipeListener to do this instead. 
    3132 
    3233        // client side - connect to a waiting server 
     
    4142        virtual bool StreamDataLeft(); 
    4243        virtual bool StreamClosed(); 
    43         bool IsConnected() { return mIsConnected; } 
    44         HANDLE GetSocketHandle() { return mSocketHandle; } 
    45         HANDLE GetReadableEvent() { return mReadableEvent; } 
    4644 
    4745protected: 
     
    6361        bool mIsConnected; 
    6462 
     63public: 
    6564        static std::string sPipeNamePrefix; 
    6665};