34 #include <px4_platform_common/log.h> 61 , _ControlQOutIndex(0)
96 bool overwriteData =
false;
106 overwriteData =
true;
134 if ((
unsigned long)(t2 - check_time) > 10000000) {
153 PX4_DEBUG(
"Adding message[%s] to subscriber queue...", messageName);
181 if (*it == messageName) {
182 PX4_DEBUG(
"##### Found the message[%s] in the subscriber list-index[%d]", messageName, i);
194 PX4_DEBUG(
"[unblock_get_data_method] calling post method for _DataAvailableSemaphore()");
222 bool overwriteData =
false;
234 overwriteData =
true;
269 if ((
unsigned long)(t2 - check_time) > 10000000) {
295 PX4_ERR(
"Error[check_and_expand_data_buffer] Failed to allocate data queue buffer of size[%ld]", length);
344 int32_t topic_name_len,
346 int32_t data_len_in_bytes,
347 int32_t *bytes_returned
351 PX4_DEBUG(
"Get data should not be called...");
384 PX4_ERR(
"Error[get_data-CONTROL]: max topic_name_len[%ld] < controlMsgLen[%d]",
416 PX4_ERR(
"Error:[get_data-DATA] type msg max topic_name_len[%ld] > dataMsgLen[%d] ",
420 PX4_ERR(
"Error:[get_data-DATA] Or data_buffer_len[%ld] > message_size[%ld] ",
430 PX4_ERR(
"[get_data] Error: Semaphore is up when there is no data on the control/data queues");
437 if ((
unsigned long)(t3 - t1) >
_get_max) {
_get_max = (
unsigned long)(t3 - t1); }
441 if ((
unsigned long)(t3 - check_time) > 1000000) {
443 topic_name[0] =
'\0';
462 int32_t max_buffer_in_bytes,
463 int32_t *returned_bytes,
476 int32_t bytes_copied = 0;
477 int32_t copy_result = 0;
480 int32_t topic_count_to_return = 0;
495 if (copy_result == -1) {
496 if (bytes_copied == 0) {
508 bytes_copied += copy_result;
510 *returned_bytes = bytes_copied;
519 if (bytes_copied == 0) {
521 PX4_WARN(
"ERROR: Insufficent space in data buffer, no topics returned");
524 PX4_DEBUG(
"Exiting out of the while loop...");
544 if (copy_result == -1) {
545 if (bytes_copied == 0) {
557 bytes_copied += copy_result;
559 *returned_bytes = bytes_copied;
568 if (bytes_copied == 0) {
570 PX4_WARN(
"ERROR: Insufficent space in data buffer, no topics returned");
573 PX4_DEBUG(
"Exiting out of the while loop...");
582 PX4_ERR(
"[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues");
586 if (topic_count_to_return != *topic_count) {
587 PX4_WARN(
"Not sending all topics: topics_to_return:[%ld] topics_returning:[%ld]", topic_count_to_return, *topic_count);
601 if ((
unsigned long)(t3 - check_time) > 10000000) {
637 int32_t dst_buffer_len)
645 uint16_t msg_size = (isData ?
650 int32_t field_header_offset = offset;
651 int32_t field_topic_name_offset = field_header_offset +
sizeof(
struct BulkTransferHeader);
652 int32_t field_data_offset = field_topic_name_offset + msg_size + 1;
666 if (isData && (field_data_offset +
_DataMsgQueue[ src_index ]._Length) < dst_buffer_len) {
667 memmove(&(dst_buffer[field_header_offset]), (
char *)(&header),
sizeof(header));
671 &(dst_buffer[field_topic_name_offset]),
677 PX4_WARN(
"########## Error MsgName cannot be zero: ");
681 memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length);
682 rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset;
684 }
else if (field_data_offset < dst_buffer_len) {
685 memmove(&(dst_buffer[field_header_offset]), (
char *)(&header),
sizeof(header));
689 &(dst_buffer[field_topic_name_offset]),
695 PX4_WARN(
"########## Error MsgName cannot be zero: ");
699 rc = field_data_offset - offset;
702 PX4_WARN(
"Error coping the Msg to dst buffer, insuffienct space. ");
705 PX4_WARN(
"Data... offset[%ld] len[%ld] data_msg_len[%ld]",
706 offset, dst_buffer_len, (field_data_offset - offset) +
_DataMsgQueue[ src_index ]._Length);
709 PX4_WARN(
"ControlMsg... offset[%ld] len[%ld]",
710 offset, dst_buffer_len, (field_data_offset - offset));
static const int32_t _MAX_MSG_QUEUE_SIZE
data structure to store the messages to be retrived by Krait.
static struct vehicle_status_s status
static unsigned long _get_bulk_min
virtual int16_t add_subscription(const char *messageName, int32_t msgRateInHz)
Interface to notify the remote entity of interest of a subscription for a message.
static unsigned long _count
static unsigned long _bulk_topic_count_max
Semaphore _DataAvailableSemaphore
FastRpcChannel()
constructor.
static unsigned long _get_min
static unsigned long _dropped_pkts
static unsigned long _get_bulk_max
struct FastRpcControlMsg _ControlMsgQueue[_MAX_MSG_QUEUE_SIZE]
High-resolution timer with callouts and timekeeping.
virtual int16_t send_message(const char *messageName, int32_t length, uint8_t *data)
Sends the data message over the communication link.
static unsigned long _avg_q
static unsigned long _min_q
void check_and_expand_data_buffer(int32_t index, int32_t length)
static unsigned long _bulk_topic_count_min
static struct hrt_call t1
static unsigned long _max_q
struct FastRpcDataMsg _DataMsgQueue[_MAX_MSG_QUEUE_SIZE]
int16_t control_msg_queue_add(int32_t msgtype, const char *messageName)
virtual int16_t topic_unadvertised(const char *messageName)
Interface to notify the remote entity of a topic being unadvertised and is no longer publishing messa...
uORBCommunicator::IChannelRxHandler * _RxHandler
int32_t copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len)
int16_t is_subscriber_present(const char *messageName, int32_t *status)
static const int32_t _DATA_MSG_TYPE
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE
int16_t get_bulk_data(uint8_t *buffer, int32_t max_size_in_bytes, int32_t *returned_bytes, int32_t *topic_count)
__BEGIN_DECLS typedef uint64_t hrt_abstime
Absolute time, in microsecond units.
Class passed to the communication link implement to provide callback for received messages over a cha...
int32_t _ControlQOutIndex
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE
virtual int16_t remove_subscription(const char *messageName)
Interface to notify the remote entity of removal of a subscription.
std::list< std::string > _Subscribers
static uORB::FastRpcChannel _Instance
std::set< std::string > _RemoteSubscribers
virtual int16_t topic_advertised(const char *messageName)
Interface to notify the remote entity of a topic being advertised.
static const int32_t _PACKET_HEADER_SIZE
int32_t get_msg_size_at(bool isData, int32_t index)
virtual int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler)
Register Message Handler.
int16_t unblock_get_data_method()
int16_t get_data(int32_t *msg_type, char *topic_name, int32_t topic_name_len, uint8_t *data, int32_t data_len_in_bytes, int32_t *bytes_returned)
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
static unsigned long _get_max