PX4 Firmware
PX4 Autopilot Software http://px4.io
uORBKraitFastRpcChannel.cpp
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * Copyright (C) 2015 Mark Charlebois. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * 3. Neither the name PX4 nor the names of its contributors may be
16  * used to endorse or promote products derived from this software
17  * without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26  * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  *
32  ****************************************************************************/
33 
35 #include <px4_platform_common/log.h>
36 #include <px4_platform_common/tasks.h>
37 #include <drivers/drv_hrt.h>
38 #include <cstdio>
39 #include <pthread.h>
40 #include <string.h>
41 
42 #define LOG_TAG "uORBKraitFastRpcChannel.cpp"
43 
45 
46 static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics);
47 
48 // static initialization.
49 static std::string _log_file_name = "./hex_dump.txt";
50 
51 static unsigned long _snd_msg_min = 0xFFFFFF;
52 static unsigned long _snd_msg_max = 0;
53 static double _snd_msg_avg = 0.0;
54 static unsigned long _snd_msg_count = 0;
55 static unsigned long _overall_snd_min = 0xFFFFFF;
56 static unsigned long _overall_snd_max = 0;
57 static double _overall_snd_avg = 0.0;
58 static unsigned long _overall_snd_count = 0;
60 static hrt_abstime _log_check_interval = 10000000;
61 
62 
64  _RxHandler(nullptr),
65  _ThreadStarted(false),
66  _ThreadShouldExit(false)
67 {
69 }
70 
71 int16_t uORB::KraitFastRpcChannel::topic_advertised(const char *messageName)
72 {
73  int16_t rc = 0;
74  PX4_DEBUG("Before calling TopicAdvertised for [%s]\n", messageName);
75  rc = _KraitWrapper.TopicAdvertised(messageName);
76  PX4_DEBUG("Response for TopicAdvertised for [%s], rc[%d]\n", messageName, rc);
77  return rc;
78 }
79 
80 int16_t uORB::KraitFastRpcChannel::topic_unadvertised(const char *messageName)
81 {
82  int16_t rc = 0;
83  PX4_DEBUG("Before calling TopicUnadvertised for [%s]\n", messageName);
84  rc = _KraitWrapper.TopicUnadvertised(messageName);
85  PX4_DEBUG("Response for TopicUnadvertised for [%s], rc[%d]\n", messageName, rc);
86  return rc;
87 }
88 
89 int16_t uORB::KraitFastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
90 {
91  int16_t rc = 0;
92  //PX4_DEBUG("Before calling AddSubscriber for [%s]\n", messageName);
93  rc = _KraitWrapper.AddSubscriber(messageName);
94  //PX4_DEBUG("Response for AddSubscriber for [%s], rc[%d]\n", messageName, rc);
95  return rc;
96 }
97 
98 int16_t uORB::KraitFastRpcChannel::remove_subscription(const char *messageName)
99 {
100  int16_t rc = 0;
101  //PX4_DEBUG("Before calling RemoveSubscriber for [%s]\n", messageName);
102  rc = _KraitWrapper.RemoveSubscriber(messageName);
103  //PX4_DEBUG("Response for RemoveSubscriber for [%s], rc[%d]\n", messageName, rc);
104  return rc;
105 }
106 
108 {
109  _RxHandler = handler;
110  return 0;
111 }
112 
113 int16_t uORB::KraitFastRpcChannel::send_message(const char *messageName, int32_t length, uint8_t *data)
114 {
115  int16_t rc = 0;
116  int32_t status = 0;
117  hrt_abstime t1, t4;
118  hrt_abstime t2 = 0;
119  hrt_abstime t3 = 0;
120  t1 = hrt_absolute_time();
121 
122  if (_AdspSubscriberCache.find(std::string(messageName)) == _AdspSubscriberCache.end()) {
123  // check the status from adsp. as it is not cached.
124  if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) {
125  _AdspSubscriberCache[messageName] = status;
127  }
128 
129  } else {
131  if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) {
132  _AdspSubscriberCache[messageName] = status;
134  }
135  }
136  }
137 
138  if (_AdspSubscriberCache[messageName] > 0) {// there are remote subscribers
139  t2 = hrt_absolute_time();
140  rc = _KraitWrapper.SendData(messageName, length, data);
141  t3 = hrt_absolute_time();
142  _snd_msg_count++;
143  //PX4_DEBUG( "***** SENDING[%s] topic to remote....\n", messageName.c_str() );
144 
145  } else {
146  //PX4_DEBUG( "******* NO SUBSCRIBER PRESENT ON THE REMOTE FOR topic[%s] \n", messageName.c_str() );
147  }
148 
149  t4 = hrt_absolute_time();
151 
152  if ((t4 - t1) < _overall_snd_min) { _overall_snd_min = (t4 - t1); }
153 
154  if ((t4 - t1) > _overall_snd_max) { _overall_snd_max = (t4 - t1); }
155 
156  if (_AdspSubscriberCache[messageName] > 0) {
157  if ((t3 - t2) < _snd_msg_min) { _snd_msg_min = (t3 - t2); }
158 
159  if ((t3 - t2) > _snd_msg_max) { _snd_msg_max = (t3 - t2); }
160 
161  _snd_msg_avg = ((double)((_snd_msg_avg * (_snd_msg_count - 1)) +
162  (unsigned long)(t3 - t2))) / (double)(_snd_msg_count);
163  }
164 
166  (unsigned long)(t4 - t1))) / (double)(_overall_snd_count);
167 
168  if ((t4 - _log_check_time) > _log_check_interval) {
169  /*
170  PX4_DEBUG("SndMsgStats: overall_min: %lu overall_max: %lu snd_msg_min: %lu snd_msg_max: %lu",
171  _overall_snd_min, _overall_snd_max,
172  _snd_msg_min, _snd_msg_max);
173  PX4_DEBUG(".... overall_avg: %f (%lu) snd_msg_avg: %f (%lu)",
174  _overall_snd_avg, _overall_snd_count, _snd_msg_avg, _snd_msg_count);
175  */
177  _overall_snd_min = _snd_msg_min = 0xFFFFFFF;
181  }
182 
183  //PX4_DEBUG( "Response for SendMessage for [%s],len[%d] rc[%d]\n", messageName.c_str(), length, rc );
184  return rc;
185 }
186 
188 {
189  _ThreadStarted = true;
190  _ThreadShouldExit = false;
191  pthread_attr_t recv_thread_attr;
192  pthread_attr_init(&recv_thread_attr);
193 
194  struct sched_param param;
195  (void)pthread_attr_getschedparam(&recv_thread_attr, &param);
196  param.sched_priority = SCHED_PRIORITY_MAX - 80;
197  (void)pthread_attr_setschedparam(&recv_thread_attr, &param);
198 
199  pthread_attr_setstacksize(&recv_thread_attr, 4096);
200 
201  if (pthread_create(&_RecvThread, &recv_thread_attr, thread_start, (void *)this) != 0) {
202  PX4_ERR("Error creating the receive thread for muorb");
203 
204  } else {
205  pthread_setname_np(_RecvThread, "muorb_krait_receiver");
206  }
207 
208  pthread_attr_destroy(&recv_thread_attr);
209 }
210 
212 {
213  _ThreadShouldExit = true;
215  //PX4_DEBUG("After calling UnblockReceiveData()...\n");
216  pthread_join(_RecvThread, NULL);
217  //PX4_DEBUG("*** After calling pthread_join...\n");
218  _ThreadStarted = false;
219 }
220 
222 {
223  if (handler != nullptr) {
224  ((uORB::KraitFastRpcChannel *)handler)->fastrpc_recv_thread();
225  }
226 
227  return 0;
228 }
229 
231 {
232  // sit in while loop.
233  int32_t rc = 0;
234  int32_t data_length = 0;
235  uint8_t *data = nullptr;
236  unsigned long rpc_min, rpc_max;
237  unsigned long orb_min, orb_max;
238  double rpc_avg, orb_avg;
239  unsigned long count = 0;
240  rpc_max = orb_max = 0;
241  rpc_min = orb_min = 0xFFFFFFFF;
242  rpc_avg = orb_avg = 0.0;
243 
244  int32_t num_topics = 0;
245 
246  hrt_abstime check_time = 0;
247 
248  while (!_ThreadShouldExit) {
249  hrt_abstime t1, t2, t3;
250  t1 = hrt_absolute_time();
251  rc = _KraitWrapper.ReceiveBulkData(&data, &data_length, &num_topics);
252 
253  t2 = hrt_absolute_time();
254 
255  if (rc == 0) {
256  //PX4_DEBUG( "Num of topics Received: %d", num_topics );
257  int32_t bytes_processed = 0;
258 
259  for (int i = 0; i < num_topics; ++i) {
260  uint8_t *new_pkt = &(data[bytes_processed]);
261  struct BulkTransferHeader *header = (struct BulkTransferHeader *)new_pkt;
262  char *messageName = (char *)(new_pkt + sizeof(struct BulkTransferHeader));
263  uint16_t check_msg_len = strlen(messageName);
264 
265  if (header->_MsgNameLen != (check_msg_len + 1)) {
266  PX4_ERR("Error: Packing error. Sent Msg Len. of[%d] but strlen returned:[%d]", header->_MsgNameLen, check_msg_len);
267  PX4_ERR("Error: NumTopics: %d processing topic: %d msgLen[%d] dataLen[%d] data_len[%d] bytes processed: %d",
268  num_topics, i, header->_MsgNameLen, header->_DataLen, data_length, bytes_processed);
269  DumpData(data, data_length, num_topics);
270  break;
271  }
272 
273  uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1);
274 
275  if (_RxHandler != nullptr) {
276  if (header->_MsgType == _DATA_MSG_TYPE) {
277  //PX4_DEBUG( "Received topic data for: [%s] len[%d]\n", messageName, data_length );
279  header->_DataLen, topic_data);
280 
281  } else if (header->_MsgType == _CONTROL_MSG_TYPE_ADVERTISE) {
282  PX4_DEBUG("Received topic advertise message for: [%s] len[%d]\n", messageName, data_length);
283  _RxHandler->process_remote_topic(messageName, true);
284 
285  } else if (header->_MsgType == _CONTROL_MSG_TYPE_UNADVERTISE) {
286  PX4_DEBUG("Received topic unadvertise message for: [%s] len[%d]\n", messageName, data_length);
287  _RxHandler->process_remote_topic(messageName, false);
288  }
289  }
290 
291  bytes_processed += header->_MsgNameLen + header->_DataLen + sizeof(struct BulkTransferHeader);
292  }
293 
294  } else {
295  PX4_DEBUG("Error: Getting data over fastRPC channel\n");
296  break;
297  }
298 
299  t3 = hrt_absolute_time();
300  count++;
301 
302  if ((unsigned long)(t2 - t1) < rpc_min) {
303  rpc_min = (unsigned long)(t2 - t1);
304  }
305 
306  if ((unsigned long)(t2 - t1) > rpc_max) {
307  rpc_max = (unsigned long)(t2 - t1);
308  }
309 
310  if ((unsigned long)(t3 - t2) < orb_min) {
311  orb_min = (unsigned long)(t3 - t2);
312  }
313 
314  if ((unsigned long)(t3 - t2) > orb_max) {
315  orb_max = (unsigned long)(t3 - t2);
316  }
317 
318  rpc_avg = ((double)((rpc_avg * (count - 1)) + (unsigned long)(t2 - t1))) / (double)(count);
319  orb_avg = ((double)((orb_avg * (count - 1)) + (unsigned long)(t3 - t2))) / (double)(count);
320 
321  if ((unsigned long)(t3 - check_time) >= 10000000) {
322  //PX4_DEBUG("Krait RPC Stats : rpc_min: %lu rpc_max: %lu rpc_avg: %f", rpc_min, rpc_max, rpc_avg);
323  //PX4_DEBUG("Krait RPC(orb) Stats: orb_min: %lu orb_max: %lu orb_avg: %f", orb_min, orb_max, orb_avg);
324  check_time = t3;
325  rpc_max = orb_max = 0;
326  rpc_min = orb_min = 0xFFFFFF;
327  orb_avg = 0;
328  rpc_avg = 0;
329  count = 0;
330  }
331 
332  //PX4_DEBUG("MsgName: %30s, t1: %lu, t2: %lu, t3: %lu, dt1: %lu, dt2: %lu",name, (unsigned long) t1, (unsigned long) t2, (unsigned long) t3,
333  // (unsigned long) (t2-t1), (unsigned long) (t3-t2));
334  }
335 
336  PX4_DEBUG("[uORB::KraitFastRpcChannel::fastrpc_recv_thread] Exiting fastrpc_recv_thread\n");
337 }
338 
339 void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics)
340 {
341  FILE *fp = fopen(_log_file_name.c_str(), "a+");
342 
343  if (fp == nullptr) {
344  PX4_ERR("Error unable to open log file[%s]", _log_file_name.c_str());
345  return;
346  }
347 
348  fprintf(fp, "===== Data Len[%d] num_topics[%d] ======\n", length, num_topics);
349 
350  for (int i = 0; i < length; i += 16) {
351  int remaining_chars = length - i;
352  remaining_chars = (remaining_chars >= 16) ? 16 : remaining_chars;
353 
354  fprintf(fp, "%p - ", &(buffer[i]));
355 
356  for (int j = 0; j < remaining_chars; j++) {
357  fprintf(fp, " %02X", buffer[i + j]);
358 
359  if (j == 7) {
360  fprintf(fp, " -");
361  }
362  }
363 
364  fprintf(fp, " ");
365 
366  for (int j = 0; j < remaining_chars; j++) {
367  fprintf(fp, "%c", (char)buffer[i + j ]);
368  }
369 
370  fprintf(fp, "\n");
371  }
372 
373  fclose(fp);
374 }
375 
std::map< std::string, int32_t > _AdspSubscriberCache
bool Initialize()
Initiatizes the rpc channel px4 muorb.
virtual int16_t send_message(const char *messageName, int32_t length, uint8_t *data)
Sends the data message over the communication link.
static struct vehicle_status_s status
Definition: Commander.cpp:138
uORBCommunicator::IChannelRxHandler * _RxHandler
t4
Definition: calcH_YAW312.c:5
static hrt_abstime _log_check_time
static unsigned long _snd_msg_min
static double _snd_msg_avg
int32_t AddSubscriber(const char *topic)
High-resolution timer with callouts and timekeeping.
virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement)=0
Interface to process a received topic from remote.
virtual int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler)
Register Message Handler.
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE
int32_t TopicAdvertised(const char *topic)
Muorb related functions to pub/sub of orb topic from krait to adsp.
virtual int16_t process_received_message(const char *messageName, int32_t length, uint8_t *data)=0
Interface to process the received data message.
static struct hrt_call t1
Definition: hrt_test.cpp:54
static unsigned long _overall_snd_count
static unsigned long _overall_snd_max
uint8_t * data
Definition: dataman.cpp:149
static void * thread_start(void *handler)
static std::string _log_file_name
virtual int16_t topic_advertised(const char *messageName)
Interface to notify the remote entity of a topic being advertised.
static double _overall_snd_avg
static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics)
static uORB::KraitFastRpcChannel * _InstancePtr
__BEGIN_DECLS typedef uint64_t hrt_abstime
Absolute time, in microsecond units.
Definition: drv_hrt.h:58
Class passed to the communication link implement to provide callback for received messages over a cha...
int32_t TopicUnadvertised(const char *topic)
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE
static unsigned long _overall_snd_min
static unsigned long _snd_msg_max
px4muorb::KraitRpcWrapper _KraitWrapper
t2
Definition: calcH_YAW312.c:3
virtual int16_t add_subscription(const char *messageName, int32_t msgRateInHz)
Interface to notify the remote entity of interest of a subscription for a message.
virtual int16_t topic_unadvertised(const char *messageName)
Interface to notify the remote entity of a topic being unadvertised and is no longer publishing messa...
int32_t RemoveSubscriber(const char *topic)
int32_t IsSubscriberPresent(const char *topic, int32_t *status)
int32_t ReceiveBulkData(uint8_t **bulk_data, int32_t *length_in_bytes, int32_t *topic_count)
int32_t SendData(const char *topic, int32_t length_in_bytes, const uint8_t *data)
static unsigned long _snd_msg_count
std::map< std::string, hrt_abstime > _AdspSubscriberSampleTimestamp
static const hrt_abstime _SubCacheRefreshRate
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
t3
Definition: calcH_YAW312.c:4
static hrt_abstime _log_check_interval
virtual int16_t remove_subscription(const char *messageName)
Interface to notify the remote entity of removal of a subscription.