// Example of parsing JSON to document by parts. // Using C++11 threads // Temporarily disable for clang (older version) due to incompatibility with libstdc++ #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__) #include "rapidjson/document.h" #include "rapidjson/error/en.h" #include "rapidjson/writer.h" #include "rapidjson/ostreamwrapper.h" #include #include #include #include using namespace rapidjson; template class AsyncDocumentParser { public: AsyncDocumentParser(Document& d) : stream_(*this) , d_(d) , parseThread_() , mutex_() , notEmpty_() , finish_() , completed_() { // Create and execute thread after all member variables are initialized. parseThread_ = std::thread(&AsyncDocumentParser::Parse, this); } ~AsyncDocumentParser() { if (!parseThread_.joinable()) return; { std::unique_lock lock(mutex_); // Wait until the buffer is read up (or parsing is completed) while (!stream_.Empty() && !completed_) finish_.wait(lock); // Automatically append '\0' as the terminator in the stream. static const char terminator[] = ""; stream_.src_ = terminator; stream_.end_ = terminator + 1; notEmpty_.notify_one(); // unblock the AsyncStringStream } parseThread_.join(); } void ParsePart(const char* buffer, size_t length) { std::unique_lock lock(mutex_); // Wait until the buffer is read up (or parsing is completed) while (!stream_.Empty() && !completed_) finish_.wait(lock); // Stop further parsing if the parsing process is completed. if (completed_) return; // Set the buffer to stream and unblock the AsyncStringStream stream_.src_ = buffer; stream_.end_ = buffer + length; notEmpty_.notify_one(); } private: void Parse() { d_.ParseStream(stream_); // The stream may not be fully read, notify finish anyway to unblock ParsePart() std::unique_lock lock(mutex_); completed_ = true; // Parsing process is completed finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting. } struct AsyncStringStream { typedef char Ch; AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {} char Peek() const { std::unique_lock lock(parser_.mutex_); // If nothing in stream, block to wait. while (Empty()) parser_.notEmpty_.wait(lock); return *src_; } char Take() { std::unique_lock lock(parser_.mutex_); // If nothing in stream, block to wait. while (Empty()) parser_.notEmpty_.wait(lock); count_++; char c = *src_++; // If all stream is read up, notify that the stream is finish. if (Empty()) parser_.finish_.notify_one(); return c; } size_t Tell() const { return count_; } // Not implemented char* PutBegin() { return 0; } void Put(char) {} void Flush() {} size_t PutEnd(char*) { return 0; } bool Empty() const { return src_ == end_; } AsyncDocumentParser& parser_; const char* src_; //!< Current read position. const char* end_; //!< End of buffer size_t count_; //!< Number of characters taken so far. }; AsyncStringStream stream_; Document& d_; std::thread parseThread_; std::mutex mutex_; std::condition_variable notEmpty_; std::condition_variable finish_; bool completed_; }; int main() { Document d; { AsyncDocumentParser<> parser(d); const char json1[] = " { \"hello\" : \"world\", \"t\" : tr"; //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14"; const char json3[] = "16, \"a\":[1, 2, 3, 4] } "; parser.ParsePart(json1, sizeof(json1) - 1); parser.ParsePart(json2, sizeof(json2) - 1); parser.ParsePart(json3, sizeof(json3) - 1); } if (d.HasParseError()) { std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl; return EXIT_FAILURE; } // Stringify the JSON to cout OStreamWrapper os(std::cout); Writer writer(os); d.Accept(writer); std::cout << std::endl; return EXIT_SUCCESS; } #else // Not supporting C++11 #include int main() { std::cout << "This example requires C++11 compiler" << std::endl; } #endif