35 #include <px4_platform_common/log.h> 36 #include <px4_platform_common/tasks.h> 42 #define LOG_TAG "uORBKraitFastRpcChannel.cpp" 46 static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics);
65 _ThreadStarted(false),
66 _ThreadShouldExit(false)
74 PX4_DEBUG(
"Before calling TopicAdvertised for [%s]\n", messageName);
76 PX4_DEBUG(
"Response for TopicAdvertised for [%s], rc[%d]\n", messageName, rc);
83 PX4_DEBUG(
"Before calling TopicUnadvertised for [%s]\n", messageName);
85 PX4_DEBUG(
"Response for TopicUnadvertised for [%s], rc[%d]\n", messageName, rc);
191 pthread_attr_t recv_thread_attr;
192 pthread_attr_init(&recv_thread_attr);
194 struct sched_param param;
195 (void)pthread_attr_getschedparam(&recv_thread_attr, ¶m);
196 param.sched_priority = SCHED_PRIORITY_MAX - 80;
197 (void)pthread_attr_setschedparam(&recv_thread_attr, ¶m);
199 pthread_attr_setstacksize(&recv_thread_attr, 4096);
202 PX4_ERR(
"Error creating the receive thread for muorb");
205 pthread_setname_np(
_RecvThread,
"muorb_krait_receiver");
208 pthread_attr_destroy(&recv_thread_attr);
223 if (handler !=
nullptr) {
234 int32_t data_length = 0;
235 uint8_t *
data =
nullptr;
236 unsigned long rpc_min, rpc_max;
237 unsigned long orb_min, orb_max;
238 double rpc_avg, orb_avg;
239 unsigned long count = 0;
240 rpc_max = orb_max = 0;
241 rpc_min = orb_min = 0xFFFFFFFF;
242 rpc_avg = orb_avg = 0.0;
244 int32_t num_topics = 0;
257 int32_t bytes_processed = 0;
259 for (
int i = 0; i < num_topics; ++i) {
260 uint8_t *new_pkt = &(data[bytes_processed]);
263 uint16_t check_msg_len = strlen(messageName);
266 PX4_ERR(
"Error: Packing error. Sent Msg Len. of[%d] but strlen returned:[%d]", header->
_MsgNameLen, check_msg_len);
267 PX4_ERR(
"Error: NumTopics: %d processing topic: %d msgLen[%d] dataLen[%d] data_len[%d] bytes processed: %d",
269 DumpData(data, data_length, num_topics);
273 uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1);
282 PX4_DEBUG(
"Received topic advertise message for: [%s] len[%d]\n", messageName, data_length);
286 PX4_DEBUG(
"Received topic unadvertise message for: [%s] len[%d]\n", messageName, data_length);
295 PX4_DEBUG(
"Error: Getting data over fastRPC channel\n");
302 if ((
unsigned long)(t2 - t1) < rpc_min) {
303 rpc_min = (
unsigned long)(t2 - t1);
306 if ((
unsigned long)(t2 -
t1) > rpc_max) {
307 rpc_max = (
unsigned long)(t2 - t1);
310 if ((
unsigned long)(t3 -
t2) < orb_min) {
311 orb_min = (
unsigned long)(t3 - t2);
314 if ((
unsigned long)(t3 -
t2) > orb_max) {
315 orb_max = (
unsigned long)(t3 - t2);
318 rpc_avg = ((double)((rpc_avg * (count - 1)) + (
unsigned long)(t2 -
t1))) / (
double)(count);
319 orb_avg = ((double)((orb_avg * (count - 1)) + (
unsigned long)(t3 -
t2))) / (
double)(count);
321 if ((
unsigned long)(t3 - check_time) >= 10000000) {
325 rpc_max = orb_max = 0;
326 rpc_min = orb_min = 0xFFFFFF;
336 PX4_DEBUG(
"[uORB::KraitFastRpcChannel::fastrpc_recv_thread] Exiting fastrpc_recv_thread\n");
339 void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics)
344 PX4_ERR(
"Error unable to open log file[%s]",
_log_file_name.c_str());
348 fprintf(fp,
"===== Data Len[%d] num_topics[%d] ======\n", length, num_topics);
350 for (
int i = 0; i < length; i += 16) {
351 int remaining_chars = length - i;
352 remaining_chars = (remaining_chars >= 16) ? 16 : remaining_chars;
354 fprintf(fp,
"%p - ", &(buffer[i]));
356 for (
int j = 0; j < remaining_chars; j++) {
357 fprintf(fp,
" %02X", buffer[i + j]);
366 for (
int j = 0; j < remaining_chars; j++) {
367 fprintf(fp,
"%c", (
char)buffer[i + j ]);
std::map< std::string, int32_t > _AdspSubscriberCache
bool Initialize()
Initiatizes the rpc channel px4 muorb.
virtual int16_t send_message(const char *messageName, int32_t length, uint8_t *data)
Sends the data message over the communication link.
static struct vehicle_status_s status
uORBCommunicator::IChannelRxHandler * _RxHandler
static hrt_abstime _log_check_time
static unsigned long _snd_msg_min
static double _snd_msg_avg
int32_t AddSubscriber(const char *topic)
High-resolution timer with callouts and timekeeping.
virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement)=0
Interface to process a received topic from remote.
virtual int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler)
Register Message Handler.
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE
int32_t TopicAdvertised(const char *topic)
Muorb related functions to pub/sub of orb topic from krait to adsp.
int32_t UnblockReceiveData()
virtual int16_t process_received_message(const char *messageName, int32_t length, uint8_t *data)=0
Interface to process the received data message.
static struct hrt_call t1
static unsigned long _overall_snd_count
static unsigned long _overall_snd_max
static void * thread_start(void *handler)
static std::string _log_file_name
virtual int16_t topic_advertised(const char *messageName)
Interface to notify the remote entity of a topic being advertised.
static double _overall_snd_avg
static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics)
static uORB::KraitFastRpcChannel * _InstancePtr
__BEGIN_DECLS typedef uint64_t hrt_abstime
Absolute time, in microsecond units.
Class passed to the communication link implement to provide callback for received messages over a cha...
int32_t TopicUnadvertised(const char *topic)
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE
static unsigned long _overall_snd_min
static unsigned long _snd_msg_max
void fastrpc_recv_thread()
px4muorb::KraitRpcWrapper _KraitWrapper
static const int32_t _DATA_MSG_TYPE
virtual int16_t add_subscription(const char *messageName, int32_t msgRateInHz)
Interface to notify the remote entity of interest of a subscription for a message.
virtual int16_t topic_unadvertised(const char *messageName)
Interface to notify the remote entity of a topic being unadvertised and is no longer publishing messa...
int32_t RemoveSubscriber(const char *topic)
int32_t IsSubscriberPresent(const char *topic, int32_t *status)
KraitFastRpcChannel()
constructor.
int32_t ReceiveBulkData(uint8_t **bulk_data, int32_t *length_in_bytes, int32_t *topic_count)
int32_t SendData(const char *topic, int32_t length_in_bytes, const uint8_t *data)
static unsigned long _snd_msg_count
std::map< std::string, hrt_abstime > _AdspSubscriberSampleTimestamp
static const hrt_abstime _SubCacheRefreshRate
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
static hrt_abstime _log_check_interval
virtual int16_t remove_subscription(const char *messageName)
Interface to notify the remote entity of removal of a subscription.