Commit b9f31cb4 authored by Valentin Platzgummer's avatar Valentin Platzgummer

trying to pin point the thread issue

parent 823c2823
...@@ -50,7 +50,6 @@ MacBuild { ...@@ -50,7 +50,6 @@ MacBuild {
LinuxBuild { LinuxBuild {
CONFIG += qesp_linux_udev CONFIG += qesp_linux_udev
CONFIG += ccache # improves build time
} }
WindowsBuild { WindowsBuild {
...@@ -494,7 +493,6 @@ HEADERS += \ ...@@ -494,7 +493,6 @@ HEADERS += \
src/comm/ros_bridge/include/TopicPublisher.h \ src/comm/ros_bridge/include/TopicPublisher.h \
src/comm/ros_bridge/include/TopicSubscriber.h \ src/comm/ros_bridge/include/TopicSubscriber.h \
src/comm/ros_bridge/include/TypeFactory.h \ src/comm/ros_bridge/include/TypeFactory.h \
src/comm/ros_bridge/src/PackageBuffer.h \
src/comm/utilities.h src/comm/utilities.h
SOURCES += \ SOURCES += \
src/Snake/clipper/clipper.cpp \ src/Snake/clipper/clipper.cpp \
...@@ -514,6 +512,7 @@ SOURCES += \ ...@@ -514,6 +512,7 @@ SOURCES += \
src/Wima/WimaBridge.cc \ src/Wima/WimaBridge.cc \
src/comm/ros_bridge/include/ComPrivateInclude.cpp \ src/comm/ros_bridge/include/ComPrivateInclude.cpp \
src/comm/ros_bridge/include/MessageTag.cpp \ src/comm/ros_bridge/include/MessageTag.cpp \
src/comm/ros_bridge/include/RosBridgeClient.cpp \
src/comm/ros_bridge/include/Server.cpp \ src/comm/ros_bridge/include/Server.cpp \
src/comm/ros_bridge/include/TopicPublisher.cpp \ src/comm/ros_bridge/include/TopicPublisher.cpp \
src/comm/ros_bridge/include/TopicSubscriber.cpp \ src/comm/ros_bridge/include/TopicSubscriber.cpp \
......
This diff is collapsed.
#include "TopicSubscriber.h" #include "TopicSubscriber.h"
#include <thread>
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker, ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
RosbridgeWsClient &rbc) : RosbridgeWsClient &rbc) :
...@@ -56,37 +57,39 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -56,37 +57,39 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
using namespace std::placeholders; using namespace std::placeholders;
auto callbackWrapper = [callback]( auto callbackWrapper = [callback](
std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message){ std::shared_ptr<WsClient::InMessage> in_message)
// Parse document. {
JsonDoc docFull; // Parse document.
docFull.Parse(in_message->string().c_str()); JsonDoc docFull;
if ( docFull.HasParseError() ) { docFull.Parse(in_message->string().c_str());
std::cout << "Json document has parse error: " if ( docFull.HasParseError() ) {
<< in_message->string() std::cout << "Json document has parse error: "
<< std::endl; << in_message->string()
return; << std::endl;
} else if (!docFull.HasMember("msg")) { return;
std::cout << "Json document does not contain a message (\"msg\"): " } else if (!docFull.HasMember("msg")) {
<< in_message->string() std::cout << "Json document does not contain a message (\"msg\"): "
<< std::endl; << in_message->string()
return; << std::endl;
} return;
}
// Extract message and call callback. // Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc()); JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
callback(std::move(pDoc)); callback(std::move(pDoc));
return;
}; };
if ( !_rbc.topicAvailable(topic) ){ if ( !_rbc.topicAvailable(topic) ){
// Wait until topic is available. // Wait until topic is available.
std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl;
std::thread t([this, clientName, topic, callbackWrapper]{ std::thread t([this, clientName, topic, callbackWrapper]{
this->_rbc.waitForTopic(topic, this->_stopped); this->_rbc.waitForTopic(topic, this->_stopped);
if ( !this->_stopped->load() ){ if ( !this->_stopped->load() ){
this->_rbc.addClient(clientName); this->_rbc.addClient(clientName);
this->_rbc.subscribe(clientName, topic, callbackWrapper); this->_rbc.subscribe(clientName, topic, callbackWrapper);
} }
std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl;
}); });
t.detach(); t.detach();
} else { } else {
......
#pragma once
#include "boost/lockfree/queue.hpp"
#include <functional>
namespace ROSBridge {
namespace Bridge {
namespace lf = ::boost::lockfree;
template <class T>
class PackageBuffer
{
public:
PackageBuffer();
void push(T t) {
buffer.push(t);
if (_pushCallback)
_pushCallback();
}
T pop() {
T temp = buffer.pop();
if (_popCallback)
_popCallback();
return temp;
}
void setPushCallback(std::function<void(void)> pushCallback) {
_pushCallback = pushCallback;
}
void setPopCallback(std::function<void(void)> popCallback) {
_popCallback = popCallback;
}
bool empty() const { return buffer.empty();}
private:
lf::queue<T> buffer;
std::function<void(void)> _popCallback;
std::function<void(void)> _pushCallback;
};
} // namespace Communicator
} // namespace
...@@ -43,14 +43,22 @@ void ROSBridge::ROSBridge::start() ...@@ -43,14 +43,22 @@ void ROSBridge::ROSBridge::start()
void ROSBridge::ROSBridge::reset() void ROSBridge::ROSBridge::reset()
{ {
auto start = std::chrono::high_resolution_clock::now();
std::cout << "ROSBridge::reset: reset publisher" << std::endl; std::cout << "ROSBridge::reset: reset publisher" << std::endl;
_topicPublisher.reset(); _topicPublisher.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: reset subscriber" << std::endl; std::cout << "ROSBridge::reset: reset subscriber" << std::endl;
_topicSubscriber.reset(); _topicSubscriber.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: reset server" << std::endl; std::cout << "ROSBridge::reset: reset server" << std::endl;
_server.reset(); _server.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: _stopped->store(true)" << std::endl; std::cout << "ROSBridge::reset: _stopped->store(true)" << std::endl;
_stopped->store(true); _stopped->store(true);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
auto end = std::chrono::high_resolution_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout << "ROSBridge reset duration: " << delta << " ms" << std::endl;
} }
bool ROSBridge::ROSBridge::ping() bool ROSBridge::ROSBridge::ping()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment