PX4 Firmware
PX4 Autopilot Software http://px4.io
uORBFastRpcChannel.cpp
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * Copyright (C) 2015 Mark Charlebois. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * 3. Neither the name PX4 nor the names of its contributors may be
16  * used to endorse or promote products derived from this software
17  * without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26  * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  *
32  ****************************************************************************/
33 #include "uORBFastRpcChannel.hpp"
34 #include <px4_platform_common/log.h>
35 #include <algorithm>
36 #include <string.h>
37 #include <drivers/drv_hrt.h>
38 
39 // static initialization.
41 
42 static unsigned long _dropped_pkts;
43 static unsigned long _get_min = 0xFFFFFF;
44 static unsigned long _get_max = 0;
45 static unsigned long _min_q = 200;
46 static unsigned long _max_q = 0;
47 static unsigned long _avg_q = 0;
48 static unsigned long _count = 0;
49 static unsigned long _get_bulk_min = 0xFFFFFF;
50 static unsigned long _get_bulk_max = 0;
51 static unsigned long _bulk_topic_count_min = 0xFFFFFF;
52 static unsigned long _bulk_topic_count_max = 0;
53 
54 //==============================================================================
55 //==============================================================================
57  : _RxHandler(0)
58  , _DataQInIndex(0)
59  , _DataQOutIndex(0)
60  , _ControlQInIndex(0)
61  , _ControlQOutIndex(0)
62 {
63  for (int32_t i = 0; i < _MAX_MSG_QUEUE_SIZE; ++ i) {
65  _DataMsgQueue[i]._Length = 0;
66  _DataMsgQueue[i]._Buffer = 0;
67  }
68 
69  _RemoteSubscribers.clear();
70 }
71 
72 //==============================================================================
73 //==============================================================================
74 int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
75 {
77 }
78 
79 //==============================================================================
80 //==============================================================================
81 int16_t uORB::FastRpcChannel::topic_unadvertised(const char *messageName)
82 {
84 }
85 
86 //==============================================================================
87 //==============================================================================
88 int16_t uORB::FastRpcChannel::control_msg_queue_add(int32_t msgtype, const char *messageName)
89 {
90  int16_t rc = 0;
91  hrt_abstime t1, t2;
92  static hrt_abstime check_time = 0;
93 
94  t1 = hrt_absolute_time();
95  _QueueMutex.lock();
96  bool overwriteData = false;
97 
98  if (IsControlQFull()) {
99  // queue is full. Overwrite the oldest data.
101 
103  _ControlQOutIndex = 0;
104  }
105 
106  overwriteData = true;
107  _dropped_pkts++;
108  }
109 
111  _ControlMsgQueue[ _ControlQInIndex ]._MsgName = messageName;
112 
114 
116  _ControlQInIndex = 0;
117  }
118 
119  // the assumption here is that each caller reads only one data from either control or data queue.
120  if (ControlQSize() == 1 && DataQSize() == 0) { // post it only of the queue moves from empty to available.
122  }
123 
124  if ((unsigned long)ControlQSize() < _min_q) { _min_q = (unsigned long)ControlQSize(); }
125 
126  if ((unsigned long)ControlQSize() > _max_q) { _max_q = (unsigned long)ControlQSize(); }
127 
128  _count++;
129  _avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(ControlQSize()))) / (double)(_count);
130 
132  t2 = hrt_absolute_time();
133 
134  if ((unsigned long)(t2 - check_time) > 10000000) {
135  //PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1));
136  //PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count);
138  _max_q = 0;
139  _avg_q = 0;
140  _count = 0;
141  check_time = t2;
142  }
143 
144  return rc;
145 }
146 
147 //==============================================================================
148 //==============================================================================
149 int16_t uORB::FastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
150 {
151  int16_t rc = 0;
152  _Subscribers.push_back(messageName);
153  PX4_DEBUG("Adding message[%s] to subscriber queue...", messageName);
154  return rc;
155 }
156 
157 //==============================================================================
158 //==============================================================================
159 int16_t uORB::FastRpcChannel::remove_subscription(const char *messageName)
160 {
161  int16_t rc = 0;
162  _Subscribers.remove(messageName);
163 
164  return rc;
165 }
166 
167 int16_t uORB::FastRpcChannel::is_subscriber_present(const char *messageName, int32_t *status)
168 {
169  int16_t rc = 0;
170 
171  if (std::find(_Subscribers.begin(), _Subscribers.end(), messageName) != _Subscribers.end()) {
172  *status = 1;
173  //PX4_DEBUG("******* Found subscriber for message[%s]....", messageName);
174 
175  } else {
176  *status = 0;
177  //PX4_WARN("@@@@@ Subscriber not found for[%s]...numSubscribers[%d]", messageName, _Subscribers.size());
178  int i = 0;
179 
180  for (std::list<std::string>::iterator it = _Subscribers.begin(); it != _Subscribers.end(); ++it) {
181  if (*it == messageName) {
182  PX4_DEBUG("##### Found the message[%s] in the subscriber list-index[%d]", messageName, i);
183  }
184 
185  ++i;
186  }
187  }
188 
189  return rc;
190 }
191 
193 {
194  PX4_DEBUG("[unblock_get_data_method] calling post method for _DataAvailableSemaphore()");
196  return 0;
197 }
198 //==============================================================================
199 //==============================================================================
201 {
202  _RxHandler = handler;
203  return 0;
204 }
205 
206 
207 //==============================================================================
208 //==============================================================================
209 int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t length, uint8_t *data)
210 {
211  int16_t rc = 0;
212  hrt_abstime t1, t2;
213  static hrt_abstime check_time = 0;
214 
215  if (_RemoteSubscribers.find(messageName) == _RemoteSubscribers.end()) {
216  //there is no-remote subscriber. So do not queue the message.
217  return rc;
218  }
219 
220  t1 = hrt_absolute_time();
221  _QueueMutex.lock();
222  bool overwriteData = false;
223 
224  if (IsDataQFull()) {
225  // queue is full. Overwrite the oldest data.
226  //PX4_WARN("[send_message] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]",
227  // _DataQInIndex, _DataQOutIndex, _MAX_MSG_QUEUE_SIZE);
228  _DataQOutIndex++;
229 
231  _DataQOutIndex = 0;
232  }
233 
234  overwriteData = true;
235  _dropped_pkts++;
236  }
237 
238  // now check to see if the data queue's buffer size if large enough to memcpy the data.
239  // if not, delete the old buffer and re-create a new buffer of larger size.
241 
242  // now memcpy the data to the buffer.
243  memcpy(_DataMsgQueue[ _DataQInIndex ]._Buffer, data, length);
244  _DataMsgQueue[ _DataQInIndex ]._Length = length;
245  _DataMsgQueue[ _DataQInIndex ]._MsgName = messageName;
246 
247  _DataQInIndex++;
248 
250  _DataQInIndex = 0;
251  }
252 
253  // the assumption here is that each caller reads only one data from either control or data queue.
254  //if (!overwriteData) {
255  if (DataQSize() == 1 && ControlQSize() == 0) { // post it only of the queue moves from empty to available.
257  }
258 
259  if ((unsigned long)DataQSize() < _min_q) { _min_q = (unsigned long)DataQSize(); }
260 
261  if ((unsigned long)DataQSize() > _max_q) { _max_q = (unsigned long)DataQSize(); }
262 
263  _count++;
264  _avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(DataQSize()))) / (double)(_count);
265 
267  t2 = hrt_absolute_time();
268 
269  if ((unsigned long)(t2 - check_time) > 10000000) {
270  //PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1));
271  //PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count);
273  _max_q = 0;
274  _avg_q = 0;
275  _count = 0;
276  check_time = t2;
277  }
278 
279  return rc;
280 }
281 
282 //==============================================================================
283 //==============================================================================
284 void uORB::FastRpcChannel::check_and_expand_data_buffer(int32_t index, int32_t length)
285 {
286  if (_DataMsgQueue[ index ]._MaxBufferSize < length) {
287  // create a new buffer of size length and delete old buffer.
288  if (_DataMsgQueue[ index ]._Buffer != 0) {
289  delete _DataMsgQueue[ index ]._Buffer;
290  }
291 
292  _DataMsgQueue[ index ]._Buffer = new uint8_t[ length ];
293 
294  if (_DataMsgQueue[ index ]._Buffer == 0) {
295  PX4_ERR("Error[check_and_expand_data_buffer] Failed to allocate data queue buffer of size[%ld]", length);
296  _DataMsgQueue[ index ]._MaxBufferSize = 0;
297  return;
298  }
299 
300  _DataMsgQueue[ index ]._MaxBufferSize = length;
301  }
302 }
303 
305 {
306  int32_t rc;
308  rc %= _MAX_MSG_QUEUE_SIZE;
309  return rc;
310 }
311 
313 {
314  int32_t rc;
316  rc %= _MAX_MSG_QUEUE_SIZE;
317  return rc;
318 }
319 
321 {
322  return (ControlQSize() == (_MAX_MSG_QUEUE_SIZE - 1));
323 }
324 
326 {
327  return (ControlQSize() == 0);
328 }
329 
331 {
332  return (DataQSize() == (_MAX_MSG_QUEUE_SIZE - 1));
333 }
334 
336 {
337  return (DataQSize() == 0);
338 }
339 
341 (
342  int32_t *msg_type,
343  char *topic_name,
344  int32_t topic_name_len,
345  uint8_t *data,
346  int32_t data_len_in_bytes,
347  int32_t *bytes_returned
348 )
349 {
350  int16_t rc = 0;
351  PX4_DEBUG("Get data should not be called...");
352  return -1;
353  // wait for data availability
354  static hrt_abstime check_time = 0;
357  // hrt_abstime t2 = hrt_absolute_time();
358  _QueueMutex.lock();
359 
360  if (DataQSize() != 0 || ControlQSize() != 0) {
361  if (ControlQSize() > 0) {
362  // read the first element of the Control Queue.
364 
365  if ((int)_ControlMsgQueue[ _ControlQOutIndex ]._MsgName.size() < (int)topic_name_len) {
366  memcpy
367  (
368  topic_name,
369  _ControlMsgQueue[ _ControlQOutIndex ]._MsgName.c_str(),
371  );
372 
373  topic_name[_ControlMsgQueue[ _ControlQOutIndex ]._MsgName.size()] = 0;
374 
375  *bytes_returned = 0;
376 
378 
380  _ControlQOutIndex = 0;
381  }
382 
383  } else {
384  PX4_ERR("Error[get_data-CONTROL]: max topic_name_len[%ld] < controlMsgLen[%d]",
385  topic_name_len,
386  _ControlMsgQueue[ _ControlQOutIndex ]._MsgName.size()
387  );
388  rc = -1;
389  }
390 
391  } else {
392  // read the first element of the Control Queue.
393  *msg_type = _DATA_MSG_TYPE;
394 
395  if (((int)_DataMsgQueue[ _DataQOutIndex ]._MsgName.size() < topic_name_len) ||
396  (_DataMsgQueue[ _DataQOutIndex ]._Length < data_len_in_bytes)) {
397  memcpy
398  (
399  topic_name,
400  _DataMsgQueue[ _DataQOutIndex ]._MsgName.c_str(),
402  );
403 
404  topic_name[_DataMsgQueue[ _DataQOutIndex ]._MsgName.size()] = 0;
405 
406  *bytes_returned = _DataMsgQueue[ _DataQOutIndex ]._Length;
407  memcpy(data, _DataMsgQueue[ _DataQOutIndex ]._Buffer, _DataMsgQueue[ _DataQOutIndex ]._Length);
408 
409  _DataQOutIndex++;
410 
412  _DataQOutIndex = 0;
413  }
414 
415  } else {
416  PX4_ERR("Error:[get_data-DATA] type msg max topic_name_len[%ld] > dataMsgLen[%d] ",
417  topic_name_len,
418  _DataMsgQueue[ _DataQOutIndex ]._MsgName.size()
419  );
420  PX4_ERR("Error:[get_data-DATA] Or data_buffer_len[%ld] > message_size[%ld] ",
421  data_len_in_bytes,
422  _DataMsgQueue[ _DataQOutIndex ]._Length
423  );
424 
425  rc = -1;
426  }
427  }
428 
429  } else {
430  PX4_ERR("[get_data] Error: Semaphore is up when there is no data on the control/data queues");
431  rc = -1;
432  }
433 
436 
437  if ((unsigned long)(t3 - t1) > _get_max) { _get_max = (unsigned long)(t3 - t1); }
438 
439  if ((unsigned long)(t3 - t1) < _get_min) { _get_min = (unsigned long)(t3 - t1); }
440 
441  if ((unsigned long)(t3 - check_time) > 1000000) {
442  if (rc != 0) {
443  topic_name[0] = '\0';
444  }
445 
446  /*
447  PX4_DEBUG("GetData: %30s: t1: %lu t2: %lu t3: %lu", topic_name, (unsigned long)t1, (unsigned long)t2,
448  (unsigned long)t3);
449  PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize());
450  PX4_DEBUG("ADSP RPC Stats: _get_min: %lu _get_max: %lu _dropped_pkts: %lu", _get_min, _get_max, _dropped_pkts);
451  */
452  check_time = t3;
453  }
454 
455  return rc;
456 }
457 
458 
460 (
461  uint8_t *buffer,
462  int32_t max_buffer_in_bytes,
463  int32_t *returned_bytes,
464  int32_t *topic_count
465 )
466 {
467  int16_t rc = 0;
468  // wait for data availability
469  static hrt_abstime check_time = 0;
472  //hrt_abstime t2 = hrt_absolute_time();
473 
474  _QueueMutex.lock();
475 
476  int32_t bytes_copied = 0;
477  int32_t copy_result = 0;
478  *returned_bytes = 0;
479  *topic_count = 0;
480  int32_t topic_count_to_return = 0;
481 
482  if (DataQSize() != 0 || ControlQSize() != 0) {
483  if (DataQSize() != 0) {
484  //PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() );
485  topic_count_to_return = DataQSize();
486 
487  while (DataQSize() != 0) {
488  // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
489  //_DataAvailableSemaphore.wait();
490  if (get_msg_size_at(true, _DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
491  // there is enough space in the buffer, copy the data.
492  //PX4_DEBUG( "Coping Data to buffer..." );
493  copy_result = copy_msg_to_buffer(true, _DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
494 
495  if (copy_result == -1) {
496  if (bytes_copied == 0) {
497  rc = -1;
498  }
499 
500  break;
501 
502  } else {
503  //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
504  // buffer[bytes_copied], \
505  // buffer[bytes_copied+1], \
506  // buffer[bytes_copied+2], \
507  // buffer[bytes_copied+3] );
508  bytes_copied += copy_result;
509  (*topic_count)++;
510  *returned_bytes = bytes_copied;
511  _DataQOutIndex++;
512 
514  _DataQOutIndex = 0;
515  }
516  }
517 
518  } else {
519  if (bytes_copied == 0) {
520  rc = -1;
521  PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
522 
523  } else {
524  PX4_DEBUG("Exiting out of the while loop...");
525  }
526 
527  break;
528  }
529  }
530  }
531 
532  if (ControlQSize() != 0) {
533  //PX4_DEBUG( "get_bulk_data: QSize: %d", ControlQSize() );
534  topic_count_to_return += ControlQSize();
535 
536  while (ControlQSize() != 0) {
537  // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
538  //_DataAvailableSemaphore.wait();
539  if (get_msg_size_at(false, _ControlQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
540  // there is enough space in the buffer, copy the data.
541  //PX4_DEBUG( "Coping Control msg to buffer..." );
542  copy_result = copy_msg_to_buffer(false, _ControlQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
543 
544  if (copy_result == -1) {
545  if (bytes_copied == 0) {
546  rc = -1;
547  }
548 
549  break;
550 
551  } else {
552  //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
553  // buffer[bytes_copied], \
554  // buffer[bytes_copied+1], \
555  // buffer[bytes_copied+2], \
556  // buffer[bytes_copied+3] );
557  bytes_copied += copy_result;
558  (*topic_count)++;
559  *returned_bytes = bytes_copied;
561 
563  _ControlQOutIndex = 0;
564  }
565  }
566 
567  } else {
568  if (bytes_copied == 0) {
569  rc = -1;
570  PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
571 
572  } else {
573  PX4_DEBUG("Exiting out of the while loop...");
574  }
575 
576  break;
577  }
578  }
579  }
580 
581  } else {
582  PX4_ERR("[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues");
583  rc = -1;
584  }
585 
586  if (topic_count_to_return != *topic_count) {
587  PX4_WARN("Not sending all topics: topics_to_return:[%ld] topics_returning:[%ld]", topic_count_to_return, *topic_count);
588  }
589 
592 
593  if ((unsigned long)(t3 - t1) > _get_bulk_max) { _get_bulk_max = (unsigned long)(t3 - t1); }
594 
595  if ((unsigned long)(t3 - t1) < _get_bulk_min) { _get_bulk_min = (unsigned long)(t3 - t1); }
596 
597  if ((unsigned long)(*topic_count) > _bulk_topic_count_max) { _bulk_topic_count_max = (unsigned long)(*topic_count); }
598 
599  if ((unsigned long)(*topic_count) < _bulk_topic_count_min) { _bulk_topic_count_min = (unsigned long)(*topic_count); }
600 
601  if ((unsigned long)(t3 - check_time) > 10000000) {
602  //PX4_DEBUG("GetData: t1: %lu t2: %lu t3: %lu", (unsigned long)t1, (unsigned long)t2, (unsigned long)t3);
603  //PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize());
604  //PX4_DEBUG("ADSP RPC Stats: _get_bulk_min: %lu _get_bulk_max: %lu _dropped_pkts: %lu", _get_bulk_min, _get_bulk_max,
605  // _dropped_pkts);
606  //PX4_DEBUG(" .... topic_count_min: %lu topic_count_max: %lu", _bulk_topic_count_min, _bulk_topic_count_max);
607  _get_bulk_max = 0;
608  _get_bulk_min = 0xFFFFFF;
609  _bulk_topic_count_min = 0xFFFFFF;
611  check_time = t3;
612  }
613 
614  //PX4_DEBUG( "Returning topics: %d bytes_returned: %d", *topic_count, *returned_bytes );
615  return rc;
616 }
617 
618 int32_t uORB::FastRpcChannel::get_msg_size_at(bool isData, int32_t index)
619 {
620  // the assumption here is that this is called within the context of semaphore,
621  // hence lock/unlock is not needed.
622  int32_t rc = 0;
623 
624  if (isData) {
625  rc += _DataMsgQueue[ index ]._Length;
626  rc += _DataMsgQueue[ index ]._MsgName.size() + 1;
627 
628  } else {
629  rc += _ControlMsgQueue[ index ]._MsgName.size() + 1;
630  }
631 
632  rc += _PACKET_HEADER_SIZE;
633  return rc;
634 }
635 
636 int32_t uORB::FastRpcChannel::copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset,
637  int32_t dst_buffer_len)
638 {
639  int32_t rc = -1;
640 
641  // before calling this method the following are assumed:
642  // * sem_lock is acquired for data protection
643  // * the dst_buffer is validated to
644 
645  uint16_t msg_size = (isData ?
646  (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size()) :
647  (uint16_t)(_ControlMsgQueue[ src_index ]._MsgName.size()));
648 
649  // compute the different offsets to pack the packets.
650  int32_t field_header_offset = offset;
651  int32_t field_topic_name_offset = field_header_offset + sizeof(struct BulkTransferHeader);
652  int32_t field_data_offset = field_topic_name_offset + msg_size + 1;
653 
654  int16_t msg_type = isData ? _DATA_MSG_TYPE : _ControlMsgQueue[ src_index ]._Type;
655 
656  struct BulkTransferHeader header = { (uint16_t)msg_type, (uint16_t)(msg_size + 1),
657  (uint16_t)(isData ? (_DataMsgQueue[ src_index ]._Length) : 0)
658  };
659 
660 
661  //PX4_DEBUG( "Offsets: header[%d] name[%d] data[%d]",
662  // field_header_offset,
663  // field_topic_name_offset,
664  // field_data_offset );
665 
666  if (isData && (field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) {
667  memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header));
668  // pack the data here.
669  memmove
670  (
671  &(dst_buffer[field_topic_name_offset]),
672  _DataMsgQueue[ src_index ]._MsgName.c_str(),
673  _DataMsgQueue[ src_index ]._MsgName.size()
674  );
675 
676  if (_DataMsgQueue[ src_index ]._MsgName.size() == 0) {
677  PX4_WARN("########## Error MsgName cannot be zero: ");
678  }
679 
680  dst_buffer[ field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size()] = '\0';
681  memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length);
682  rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset;
683 
684  } else if (field_data_offset < dst_buffer_len) { //This is a control message
685  memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header));
686  // pack the data here.
687  memmove
688  (
689  &(dst_buffer[field_topic_name_offset]),
690  _ControlMsgQueue[ src_index ]._MsgName.c_str(),
691  _ControlMsgQueue[ src_index ]._MsgName.size()
692  );
693 
694  if (_ControlMsgQueue[ src_index ]._MsgName.size() == 0) {
695  PX4_WARN("########## Error MsgName cannot be zero: ");
696  }
697 
698  dst_buffer[ field_topic_name_offset + _ControlMsgQueue[ src_index ]._MsgName.size()] = '\0';
699  rc = field_data_offset - offset;
700 
701  } else {
702  PX4_WARN("Error coping the Msg to dst buffer, insuffienct space. ");
703 
704  if (isData) {
705  PX4_WARN("Data... offset[%ld] len[%ld] data_msg_len[%ld]",
706  offset, dst_buffer_len, (field_data_offset - offset) + _DataMsgQueue[ src_index ]._Length);
707 
708  } else {
709  PX4_WARN("ControlMsg... offset[%ld] len[%ld]",
710  offset, dst_buffer_len, (field_data_offset - offset));
711  }
712  }
713 
714  return rc;
715 }
static const int32_t _MAX_MSG_QUEUE_SIZE
data structure to store the messages to be retrived by Krait.
static struct vehicle_status_s status
Definition: Commander.cpp:138
static unsigned long _get_bulk_min
virtual int16_t add_subscription(const char *messageName, int32_t msgRateInHz)
Interface to notify the remote entity of interest of a subscription for a message.
static unsigned long _count
static unsigned long _bulk_topic_count_max
FastRpcChannel()
constructor.
static unsigned long _get_min
static unsigned long _dropped_pkts
static unsigned long _get_bulk_max
struct FastRpcControlMsg _ControlMsgQueue[_MAX_MSG_QUEUE_SIZE]
High-resolution timer with callouts and timekeeping.
int32_t i
Definition: param.h:418
virtual int16_t send_message(const char *messageName, int32_t length, uint8_t *data)
Sends the data message over the communication link.
static unsigned long _avg_q
static unsigned long _min_q
void check_and_expand_data_buffer(int32_t index, int32_t length)
static unsigned long _bulk_topic_count_min
static struct hrt_call t1
Definition: hrt_test.cpp:54
static unsigned long _max_q
struct FastRpcDataMsg _DataMsgQueue[_MAX_MSG_QUEUE_SIZE]
int16_t control_msg_queue_add(int32_t msgtype, const char *messageName)
uint8_t * data
Definition: dataman.cpp:149
virtual int16_t topic_unadvertised(const char *messageName)
Interface to notify the remote entity of a topic being unadvertised and is no longer publishing messa...
uORBCommunicator::IChannelRxHandler * _RxHandler
int32_t copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len)
int16_t is_subscriber_present(const char *messageName, int32_t *status)
static const int32_t _DATA_MSG_TYPE
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE
int16_t get_bulk_data(uint8_t *buffer, int32_t max_size_in_bytes, int32_t *returned_bytes, int32_t *topic_count)
__BEGIN_DECLS typedef uint64_t hrt_abstime
Absolute time, in microsecond units.
Definition: drv_hrt.h:58
Class passed to the communication link implement to provide callback for received messages over a cha...
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE
virtual int16_t remove_subscription(const char *messageName)
Interface to notify the remote entity of removal of a subscription.
t2
Definition: calcH_YAW312.c:3
std::list< std::string > _Subscribers
static uORB::FastRpcChannel _Instance
std::set< std::string > _RemoteSubscribers
virtual int16_t topic_advertised(const char *messageName)
Interface to notify the remote entity of a topic being advertised.
static const int32_t _PACKET_HEADER_SIZE
int32_t get_msg_size_at(bool isData, int32_t index)
virtual int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler)
Register Message Handler.
int16_t get_data(int32_t *msg_type, char *topic_name, int32_t topic_name_len, uint8_t *data, int32_t data_len_in_bytes, int32_t *bytes_returned)
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
t3
Definition: calcH_YAW312.c:4
static unsigned long _get_max