PX4 Firmware
PX4 Autopilot Software http://px4.io
Replay.cpp
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * Copyright (c) 2016-2019 PX4 Development Team. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * 3. Neither the name PX4 nor the names of its contributors may be
16  * used to endorse or promote products derived from this software
17  * without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26  * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  *
32  ****************************************************************************/
33 
34 /**
35  * @file Replay.cpp
36  * This module reads messages from an ULog file and publishes them.
37  * It sets the parameters from the log file and handles user-defined
38  * parameter overrides.
39  *
40  * @author Beat Kueng
41 */
42 
43 #include <drivers/drv_hrt.h>
44 #include <px4_platform_common/defines.h>
45 #include <px4_platform_common/posix.h>
46 #include <px4_platform_common/tasks.h>
47 #include <px4_platform_common/time.h>
48 #include <px4_platform_common/shutdown.h>
49 #include <lib/parameters/param.h>
50 
51 #include <cstring>
52 #include <float.h>
53 #include <fstream>
54 #include <iostream>
55 #include <math.h>
56 #include <time.h>
57 #include <sstream>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <string>
61 
62 #include <logger/messages.h>
63 
64 #include "Replay.hpp"
65 #include "ReplayEkf2.hpp"
66 
67 #define PARAMS_OVERRIDE_FILE PX4_ROOTFSDIR "/replay_params.txt"
68 
69 using namespace std;
70 
71 namespace px4
72 {
73 class Replay;
74 
75 char *Replay::_replay_file = nullptr;
76 
77 Replay::CompatSensorCombinedDtType::CompatSensorCombinedDtType(int gyro_integral_dt_offset_log,
78  int gyro_integral_dt_offset_intern, int accelerometer_integral_dt_offset_log,
79  int accelerometer_integral_dt_offset_intern)
80  : _gyro_integral_dt_offset_log(gyro_integral_dt_offset_log),
81  _gyro_integral_dt_offset_intern(gyro_integral_dt_offset_intern),
82  _accelerometer_integral_dt_offset_log(accelerometer_integral_dt_offset_log),
83  _accelerometer_integral_dt_offset_intern(accelerometer_integral_dt_offset_intern)
84 {
85 }
86 
88 {
89  for (size_t i = 0; i < _subscriptions.size(); ++i) {
90  delete (_subscriptions[i]);
91  }
92 
93  _subscriptions.clear();
94 }
95 
96 void *
98 {
99  // the types have the same size so we can do the conversion in-place
100  uint8_t *ptr = (uint8_t *)data;
101 
102  float gyro_integral_dt;
103  memcpy(&gyro_integral_dt, ptr + _gyro_integral_dt_offset_log, sizeof(float));
104 
105  float accel_integral_dt;
106  memcpy(&accel_integral_dt, ptr + _accelerometer_integral_dt_offset_log, sizeof(float));
107 
108  uint32_t igyro_integral_dt = (uint32_t)(gyro_integral_dt * 1e6f);
109  memcpy(ptr + _gyro_integral_dt_offset_intern, &igyro_integral_dt, sizeof(float));
110 
111  uint32_t iaccel_integral_dt = (uint32_t)(accel_integral_dt * 1e6f);
112  memcpy(ptr + _accelerometer_integral_dt_offset_intern, &iaccel_integral_dt, sizeof(float));
113 
114  return data;
115 }
116 
117 void
118 Replay::setupReplayFile(const char *file_name)
119 {
120  if (_replay_file) {
121  free(_replay_file);
122  }
123 
124  _replay_file = strdup(file_name);
125 }
126 
127 void
128 Replay::setUserParams(const char *filename)
129 {
130  string line, param_name, value_string;
131  ifstream myfile(filename);
132 
133  if (!myfile.is_open()) {
134  return;
135  }
136 
137  PX4_INFO("Applying override params from %s...", filename);
138 
139  while (!myfile.eof()) {
140  getline(myfile, line);
141 
142  if (line.empty() || line[0] == '#') {
143  continue;
144  }
145 
146  istringstream mystrstream(line);
147  mystrstream >> param_name;
148  mystrstream >> value_string;
149 
150  double param_value_double = stod(value_string);
151 
152  param_t handle = param_find(param_name.c_str());
153  param_type_t param_format = param_type(handle);
154  _overridden_params.insert(param_name);
155 
156  if (param_format == PARAM_TYPE_INT32) {
157  int32_t value = 0;
158  value = (int32_t)param_value_double;
159  param_set(handle, (const void *)&value);
160 
161  } else if (param_format == PARAM_TYPE_FLOAT) {
162  float value = 0;
163  value = (float)param_value_double;
164  param_set(handle, (const void *)&value);
165  }
166  }
167 }
168 
169 bool
171 {
172  file.seekg(0);
173  ulog_file_header_s msg_header;
174  file.read((char *)&msg_header, sizeof(msg_header));
175 
176  if (!file) {
177  return false;
178  }
179 
180  _file_start_time = msg_header.timestamp;
181  //verify it's an ULog file
182  char magic[8];
183  magic[0] = 'U';
184  magic[1] = 'L';
185  magic[2] = 'o';
186  magic[3] = 'g';
187  magic[4] = 0x01;
188  magic[5] = 0x12;
189  magic[6] = 0x35;
190  return memcmp(magic, msg_header.magic, 7) == 0;
191 }
192 
193 bool
195 {
196  PX4_INFO("Applying params from ULog file...");
197 
198  ulog_message_header_s message_header;
199  file.seekg(sizeof(ulog_file_header_s));
200 
201  while (true) {
202  file.read((char *)&message_header, ULOG_MSG_HEADER_LEN);
203 
204  if (!file) {
205  return false;
206  }
207 
208  switch (message_header.msg_type) {
209  case (int)ULogMessageType::FLAG_BITS:
210  if (!readFlagBits(file, message_header.msg_size)) {
211  return false;
212  }
213 
214  break;
215 
216  case (int)ULogMessageType::FORMAT:
217  if (!readFormat(file, message_header.msg_size)) {
218  return false;
219  }
220 
221  break;
222 
223  case (int)ULogMessageType::PARAMETER:
224  if (!readAndApplyParameter(file, message_header.msg_size)) {
225  return false;
226  }
227 
228  break;
229 
231  _data_section_start = file.tellg() - (streamoff)ULOG_MSG_HEADER_LEN;
232  return true;
233 
234  case (int)ULogMessageType::INFO: //skip
235  case (int)ULogMessageType::INFO_MULTIPLE: //skip
236  file.seekg(message_header.msg_size, ios::cur);
237  break;
238 
239  default:
240  PX4_ERR("unknown log definition type %i, size %i (offset %i)",
241  (int)message_header.msg_type, (int)message_header.msg_size, (int)file.tellg());
242  file.seekg(message_header.msg_size, ios::cur);
243  break;
244  }
245  }
246 
247  return true;
248 }
249 
250 bool
251 Replay::readFlagBits(std::ifstream &file, uint16_t msg_size)
252 {
253  if (msg_size != 40) {
254  PX4_ERR("unsupported message length for FLAG_BITS message (%i)", msg_size);
255  return false;
256  }
257 
258  _read_buffer.reserve(msg_size);
259  uint8_t *message = (uint8_t *)_read_buffer.data();
260  file.read((char *)message, msg_size);
261  //uint8_t *compat_flags = message;
262  uint8_t *incompat_flags = message + 8;
263 
264  // handle & validate the flags
265  bool contains_appended_data = incompat_flags[0] & ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK;
266  bool has_unknown_incompat_bits = false;
267 
268  if (incompat_flags[0] & ~0x1) {
269  has_unknown_incompat_bits = true;
270  }
271 
272  for (int i = 1; i < 8; ++i) {
273  if (incompat_flags[i]) {
274  has_unknown_incompat_bits = true;
275  }
276  }
277 
278  if (has_unknown_incompat_bits) {
279  PX4_ERR("Log contains unknown incompat bits set. Refusing to parse");
280  return false;
281  }
282 
283  if (contains_appended_data) {
284  uint64_t appended_offsets[3];
285  memcpy(appended_offsets, message + 16, sizeof(appended_offsets));
286 
287  if (appended_offsets[0] > 0) {
288  // the appended data is currently only used for hardfault dumps, so it's safe to ignore it.
289  PX4_INFO("Log contains appended data. Replay will ignore this data");
290  _read_until_file_position = appended_offsets[0];
291  }
292  }
293 
294  return true;
295 }
296 
297 bool
298 Replay::readFormat(std::ifstream &file, uint16_t msg_size)
299 {
300  _read_buffer.reserve(msg_size + 1);
301  char *format = (char *)_read_buffer.data();
302  file.read(format, msg_size);
303  format[msg_size] = 0;
304 
305  if (!file) {
306  return false;
307  }
308 
309  string str_format(format);
310  size_t pos = str_format.find(':');
311 
312  if (pos == string::npos) {
313  return false;
314  }
315 
316  string name = str_format.substr(0, pos);
317  string fields = str_format.substr(pos + 1);
318  _file_formats[name] = fields;
319 
320  return true;
321 }
322 
323 bool
324 Replay::readAndAddSubscription(std::ifstream &file, uint16_t msg_size)
325 {
326  _read_buffer.reserve(msg_size + 1);
327  char *message = (char *)_read_buffer.data();
328  streampos this_message_pos = file.tellg() - (streamoff)ULOG_MSG_HEADER_LEN;
329  file.read(message, msg_size);
330  message[msg_size] = 0;
331 
332  if (!file) {
333  return false;
334  }
335 
336  if (file.tellg() <= _subscription_file_pos) { //already read this subscription
337  return true;
338  }
339 
340  _subscription_file_pos = file.tellg();
341 
342  uint8_t multi_id = *(uint8_t *)message;
343  uint16_t msg_id = ((uint16_t) message[1]) | (((uint16_t) message[2]) << 8);
344  string topic_name(message + 3);
345  const orb_metadata *orb_meta = findTopic(topic_name);
346 
347  if (!orb_meta) {
348  PX4_WARN("Topic %s not found internally. Will ignore it", topic_name.c_str());
349  return true;
350  }
351 
352  CompatBase *compat = nullptr;
353 
354  // check the format: the field definitions must match
355  // FIXME: this should check recursively, all used nested types
356  string file_format = _file_formats[topic_name];
357 
358  if (file_format != orb_meta->o_fields) {
359  // check if we have a compatibility conversion available
360  if (topic_name == "sensor_combined") {
361  if (string(orb_meta->o_fields) == "uint64_t timestamp;float[3] gyro_rad;uint32_t gyro_integral_dt;"
362  "int32_t accelerometer_timestamp_relative;float[3] accelerometer_m_s2;"
363  "uint32_t accelerometer_integral_dt" &&
364  file_format == "uint64_t timestamp;float[3] gyro_rad;float gyro_integral_dt;"
365  "int32_t accelerometer_timestamp_relative;float[3] accelerometer_m_s2;"
366  "float accelerometer_integral_dt;") {
367 
368  int gyro_integral_dt_offset_log;
369  int gyro_integral_dt_offset_intern;
370  int accelerometer_integral_dt_offset_log;
371  int accelerometer_integral_dt_offset_intern;
372  int unused;
373 
374  if (findFieldOffset(file_format, "gyro_integral_dt", gyro_integral_dt_offset_log, unused) &&
375  findFieldOffset(orb_meta->o_fields, "gyro_integral_dt", gyro_integral_dt_offset_intern, unused) &&
376  findFieldOffset(file_format, "accelerometer_integral_dt", accelerometer_integral_dt_offset_log, unused) &&
377  findFieldOffset(orb_meta->o_fields, "accelerometer_integral_dt", accelerometer_integral_dt_offset_intern, unused)) {
378 
379  compat = new CompatSensorCombinedDtType(gyro_integral_dt_offset_log, gyro_integral_dt_offset_intern,
380  accelerometer_integral_dt_offset_log, accelerometer_integral_dt_offset_intern);
381  }
382  }
383  }
384 
385  if (!compat) {
386  PX4_WARN("Formats for %s don't match. Will ignore it.", topic_name.c_str());
387  PX4_WARN(" Internal format: %s", orb_meta->o_fields);
388  PX4_WARN(" File format : %s", file_format.c_str());
389  return true; // not a fatal error
390  }
391  }
392 
393  Subscription *subscription = new Subscription();
394  subscription->orb_meta = orb_meta;
395  subscription->multi_id = multi_id;
396  subscription->compat = compat;
397 
398  //find the timestamp offset
399  int field_size;
400  bool timestamp_found = findFieldOffset(orb_meta->o_fields, "timestamp", subscription->timestamp_offset, field_size);
401 
402  if (!timestamp_found) {
403  return true;
404  }
405 
406  if (field_size != 8) {
407  PX4_ERR("Unsupported timestamp with size %i, ignoring the topic %s", field_size, orb_meta->o_name);
408  return true;
409  }
410 
411  //find first data message (and the timestamp)
412  streampos cur_pos = file.tellg();
413  subscription->next_read_pos = this_message_pos; //this will be skipped
414 
415  if (!nextDataMessage(file, *subscription, msg_id)) {
416  return false;
417  }
418 
419  file.seekg(cur_pos);
420 
421  if (!subscription->orb_meta) {
422  //no message found. This is not a fatal error
423  return true;
424  }
425 
426  PX4_DEBUG("adding subscription for %s (msg_id %i)", subscription->orb_meta->o_name, msg_id);
427 
428  //add subscription
429  if (_subscriptions.size() <= msg_id) {
430  _subscriptions.resize(msg_id + 1);
431  }
432 
433  _subscriptions[msg_id] = subscription;
434 
435  onSubscriptionAdded(*_subscriptions[msg_id], msg_id);
436 
437  return true;
438 }
439 
440 bool
441 Replay::findFieldOffset(const string &format, const string &field_name, int &offset, int &field_size)
442 {
443  size_t prev_field_end = 0;
444  size_t field_end = format.find(';');
445  offset = 0;
446  field_size = 0;
447 
448  while (field_end != string::npos) {
449  size_t space_pos = format.find(' ', prev_field_end);
450 
451  if (space_pos != string::npos) {
452  string type_name_full = format.substr(prev_field_end, space_pos - prev_field_end);
453  string cur_field_name = format.substr(space_pos + 1, field_end - space_pos - 1);
454 
455  if (cur_field_name == field_name) {
456  field_size = sizeOfFullType(type_name_full);
457  return true;
458 
459  } else {
460  offset += sizeOfFullType(type_name_full);
461  }
462  }
463 
464  prev_field_end = field_end + 1;
465  field_end = format.find(';', prev_field_end);
466  }
467 
468  return false;
469 }
470 
471 bool
472 Replay::readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position)
473 {
474  ulog_message_header_s message_header;
475 
476  while (file.tellg() < end_position) {
477  file.read((char *)&message_header, ULOG_MSG_HEADER_LEN);
478 
479  if (!file) {
480  return false;
481  }
482 
483  switch (message_header.msg_type) {
484  case (int)ULogMessageType::PARAMETER:
485  if (!readAndApplyParameter(file, message_header.msg_size)) {
486  return false;
487  }
488 
489  break;
490 
491  case (int)ULogMessageType::DROPOUT:
492  readDropout(file, message_header.msg_size);
493  break;
494 
495  default: //skip all others
496  file.seekg(message_header.msg_size, ios::cur);
497  break;
498  }
499  }
500 
501  return true;
502 }
503 
504 bool
505 Replay::readAndApplyParameter(std::ifstream &file, uint16_t msg_size)
506 {
507  _read_buffer.reserve(msg_size);
508  uint8_t *message = (uint8_t *)_read_buffer.data();
509  file.read((char *)message, msg_size);
510 
511  if (!file) {
512  return false;
513  }
514 
515  uint8_t key_len = message[0];
516  string key((char *)message + 1, key_len);
517 
518  size_t pos = key.find(' ');
519 
520  if (pos == string::npos) {
521  return false;
522  }
523 
524  string type = key.substr(0, pos);
525  string param_name = key.substr(pos + 1);
526 
527  if (_overridden_params.find(param_name) != _overridden_params.end()) {
528  //this parameter is overridden, so don't apply it
529  return true;
530  }
531 
532  if (type != "int32_t" && type != "float") {
533  PX4_WARN("unknown parameter type %s, name %s (ignoring it)", type.c_str(), param_name.c_str());
534  return true;
535  }
536 
537  param_t handle = param_find(param_name.c_str());
538 
539  if (handle != PARAM_INVALID) {
540  param_set(handle, (const void *)(message + 1 + key_len));
541  }
542 
543  return true;
544 }
545 
546 bool
547 Replay::readDropout(std::ifstream &file, uint16_t msg_size)
548 {
549  uint16_t duration;
550  file.read((char *)&duration, sizeof(duration));
551 
552  PX4_INFO("Dropout in replayed log, %i ms", (int)duration);
553  return file.good();
554 }
555 
556 bool
557 Replay::nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id)
558 {
559  ulog_message_header_s message_header;
560  file.seekg(subscription.next_read_pos);
561  //ignore the first message (it's data we already read)
562  file.read((char *)&message_header, ULOG_MSG_HEADER_LEN);
563 
564  if (file) {
565  file.seekg(message_header.msg_size, ios::cur);
566  }
567 
568  uint16_t file_msg_id;
569  bool done = false;
570 
571  while (file && !done) {
572  streampos cur_pos = file.tellg();
573  file.read((char *)&message_header, ULOG_MSG_HEADER_LEN);
574 
575  if (!file) {
576  break;
577  }
578 
579  if (((streamoff)cur_pos) + ULOG_MSG_HEADER_LEN + message_header.msg_size > _read_until_file_position) {
580  file.setstate(std::ios::eofbit);
581  break;
582  }
583 
584  switch (message_header.msg_type) {
586  readAndAddSubscription(file, message_header.msg_size);
587  break;
588 
589  case (int)ULogMessageType::DATA:
590  file.read((char *)&file_msg_id, sizeof(file_msg_id));
591 
592  if (file) {
593  if (msg_id == file_msg_id) {
594  if (message_header.msg_size == subscription.orb_meta->o_size_no_padding + 2) {
595  subscription.next_read_pos = cur_pos;
596  file.seekg(subscription.timestamp_offset, ios::cur);
597  file.read((char *)&subscription.next_timestamp, sizeof(subscription.next_timestamp));
598  done = true;
599 
600  } else { //sanity check failed!
601  PX4_ERR("data message %s has wrong size %i (expected %i). Skipping",
602  subscription.orb_meta->o_name, message_header.msg_size,
603  subscription.orb_meta->o_size_no_padding + 2);
604  file.seekg(message_header.msg_size - sizeof(file_msg_id), ios::cur);
605  }
606 
607  } else { //not the one we are looking for
608  file.seekg(message_header.msg_size - sizeof(file_msg_id), ios::cur);
609  }
610  }
611 
612  break;
613 
614  case (int)ULogMessageType::REMOVE_LOGGED_MSG: //skip these
615  case (int)ULogMessageType::PARAMETER:
616  case (int)ULogMessageType::DROPOUT:
617  case (int)ULogMessageType::INFO:
619  case (int)ULogMessageType::SYNC:
620  case (int)ULogMessageType::LOGGING:
621  file.seekg(message_header.msg_size, ios::cur);
622  break;
623 
624  default:
625  //this really should not happen
626  PX4_ERR("unknown log message type %i, size %i (offset %i)",
627  (int)message_header.msg_type, (int)message_header.msg_size, (int)file.tellg());
628  file.seekg(message_header.msg_size, ios::cur);
629  break;
630  }
631  }
632 
633  if (file.eof()) { //no more data messages for this subscription
634  subscription.orb_meta = nullptr;
635  file.clear();
636  }
637 
638  return file.good();
639 }
640 
641 const orb_metadata *
642 Replay::findTopic(const std::string &name)
643 {
644  const orb_metadata *const *topics = orb_get_topics();
645 
646  for (size_t i = 0; i < orb_topics_count(); i++) {
647  if (name == topics[i]->o_name) {
648  return topics[i];
649  }
650  }
651 
652  return nullptr;
653 }
654 
655 std::string
656 Replay::extractArraySize(const std::string &type_name_full, int &array_size)
657 {
658  size_t start_pos = type_name_full.find('[');
659  size_t end_pos = type_name_full.find(']');
660 
661  if (start_pos == string::npos || end_pos == string::npos) {
662  array_size = 1;
663  return type_name_full;
664  }
665 
666  array_size = atoi(type_name_full.substr(start_pos + 1, end_pos - start_pos - 1).c_str());
667  return type_name_full.substr(0, start_pos);
668 }
669 
670 size_t
671 Replay::sizeOfType(const std::string &type_name)
672 {
673  if (type_name == "int8_t" || type_name == "uint8_t") {
674  return 1;
675 
676  } else if (type_name == "int16_t" || type_name == "uint16_t") {
677  return 2;
678 
679  } else if (type_name == "int32_t" || type_name == "uint32_t") {
680  return 4;
681 
682  } else if (type_name == "int64_t" || type_name == "uint64_t") {
683  return 8;
684 
685  } else if (type_name == "float") {
686  return 4;
687 
688  } else if (type_name == "double") {
689  return 8;
690 
691  } else if (type_name == "char" || type_name == "bool") {
692  return 1;
693  }
694 
695  const orb_metadata *orb_meta = findTopic(type_name);
696 
697  if (orb_meta) {
698  return orb_meta->o_size;
699  }
700 
701  PX4_ERR("unknown type: %s", type_name.c_str());
702  return 0;
703 }
704 
705 size_t
706 Replay::sizeOfFullType(const std::string &type_name_full)
707 {
708  int array_size;
709  string type_name = extractArraySize(type_name_full, array_size);
710  return sizeOfType(type_name) * array_size;
711 }
712 
713 bool
715 {
716  // log reader currently assumes little endian
717  int num = 1;
718 
719  if (*(char *)&num != 1) {
720  PX4_ERR("Replay only works on little endian!");
721  return false;
722  }
723 
724  if (!file.is_open()) {
725  PX4_ERR("Failed to open replay file");
726  return false;
727  }
728 
729  if (!readFileHeader(file)) {
730  PX4_ERR("Failed to read file header. Not a valid ULog file");
731  return false;
732  }
733 
734  //initialize the formats and apply the parameters from the log file
735  if (!readFileDefinitions(file)) {
736  PX4_ERR("Failed to read ULog definitions section. Broken file?");
737  return false;
738  }
739 
741  return true;
742 }
743 
744 void
746 {
747  ifstream replay_file(_replay_file, ios::in | ios::binary);
748 
749  if (!readDefinitionsAndApplyParams(replay_file)) {
750  return;
751  }
752 
753  onEnterMainLoop();
754 
756 
757  PX4_INFO("Replay in progress...");
758 
759  ulog_message_header_s message_header;
760  replay_file.seekg(_data_section_start);
761 
762  //we know the next message must be an ADD_LOGGED_MSG
763  replay_file.read((char *)&message_header, ULOG_MSG_HEADER_LEN);
764 
765  if (!readAndAddSubscription(replay_file, message_header.msg_size)) {
766  PX4_ERR("Failed to read subscription");
767  return;
768  }
769 
770  //we update the timestamps from the file by a constant offset to match
771  //the current replay time
772  const uint64_t timestamp_offset = _replay_start_time - _file_start_time;
773  uint32_t nr_published_messages = 0;
774  streampos last_additional_message_pos = _data_section_start;
775 
776  while (!should_exit() && replay_file) {
777 
778  //Find the next message to publish. Messages from different subscriptions don't need
779  //to be in chronological order, so we need to check all subscriptions
780  uint64_t next_file_time = 0;
781  int next_msg_id = -1;
782  bool first_time = true;
783 
784  for (size_t i = 0; i < _subscriptions.size(); ++i) {
785  const Subscription *subscription = _subscriptions[i];
786 
787  if (!subscription) {
788  continue;
789  }
790 
791  if (subscription->orb_meta && !subscription->ignored) {
792  if (first_time || subscription->next_timestamp < next_file_time) {
793  first_time = false;
794  next_msg_id = (int)i;
795  next_file_time = subscription->next_timestamp;
796  }
797  }
798  }
799 
800  if (next_msg_id == -1) {
801  break; //no active subscription anymore. We're done.
802  }
803 
804  Subscription &sub = *_subscriptions[next_msg_id];
805 
806  if (next_file_time == 0) {
807  //someone didn't set the timestamp properly. Consider the message invalid
808  nextDataMessage(replay_file, sub, next_msg_id);
809  continue;
810  }
811 
812  //handle additional messages between last and next published data
813  replay_file.seekg(last_additional_message_pos);
814  streampos next_additional_message_pos = sub.next_read_pos;
815  readAndHandleAdditionalMessages(replay_file, next_additional_message_pos);
816  last_additional_message_pos = next_additional_message_pos;
817 
818  const uint64_t publish_timestamp = handleTopicDelay(next_file_time, timestamp_offset);
819 
820  // It's time to publish
821  readTopicDataToBuffer(sub, replay_file);
822  memcpy(_read_buffer.data() + sub.timestamp_offset, &publish_timestamp, sizeof(uint64_t)); //adjust the timestamp
823 
824  if (handleTopicUpdate(sub, _read_buffer.data(), replay_file)) {
825  ++nr_published_messages;
826  }
827 
828  nextDataMessage(replay_file, sub, next_msg_id);
829 
830  // TODO: output status (eg. every sec), including total duration...
831  }
832 
833  for (auto &subscription : _subscriptions) {
834  if (!subscription) {
835  continue;
836  }
837 
838  if (subscription->compat) {
839  delete subscription->compat;
840  subscription->compat = nullptr;
841  }
842 
843  if (subscription->orb_advert) {
844  orb_unadvertise(subscription->orb_advert);
845  subscription->orb_advert = nullptr;
846  }
847  }
848 
849  if (!should_exit()) {
850  PX4_INFO("Replay done (published %u msgs, %.3lf s)", nr_published_messages,
851  (double)hrt_elapsed_time(&_replay_start_time) / 1.e6);
852 
853  //TODO: add parameter -q?
854  replay_file.close();
855  px4_shutdown_request(false, false);
856  }
857 
858  onExitMainLoop();
859 }
860 
861 void
862 Replay::readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file)
863 {
864  const size_t msg_read_size = sub.orb_meta->o_size_no_padding;
865  const size_t msg_write_size = sub.orb_meta->o_size;
866  _read_buffer.reserve(msg_write_size);
867  replay_file.seekg(sub.next_read_pos + (streamoff)(ULOG_MSG_HEADER_LEN + 2)); //skip header & msg id
868  replay_file.read((char *)_read_buffer.data(), msg_read_size);
869 }
870 
871 bool
872 Replay::handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file)
873 {
874  return publishTopic(sub, data);
875 }
876 
877 uint64_t
878 Replay::handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset)
879 {
880  const uint64_t publish_timestamp = next_file_time + timestamp_offset;
881 
882  // wait if necessary
883  uint64_t cur_time = hrt_absolute_time();
884 
885  // if some topics have a timestamp smaller than the log file start, publish them immediately
886  if (cur_time < publish_timestamp && next_file_time > _file_start_time) {
887  px4_usleep(publish_timestamp - cur_time);
888  }
889 
890  return publish_timestamp;
891 }
892 
893 bool
895 {
896  bool published = false;
897 
898  if (sub.compat) {
899  data = sub.compat->apply(data);
900  }
901 
902  if (sub.orb_advert) {
903  orb_publish(sub.orb_meta, sub.orb_advert, data);
904  published = true;
905 
906  } else {
907  if (sub.multi_id == 0) {
908  sub.orb_advert = orb_advertise(sub.orb_meta, data);
909  published = true;
910 
911  } else {
912  // make sure the other instances are advertised already so that we get the correct instance
913  bool advertised = false;
914 
915  for (const auto &subscription : _subscriptions) {
916  if (!subscription) {
917  continue;
918  }
919 
920  if (subscription->orb_meta) {
921  if (strcmp(sub.orb_meta->o_name, subscription->orb_meta->o_name) == 0 &&
922  subscription->orb_advert && subscription->multi_id == sub.multi_id - 1) {
923  advertised = true;
924  }
925  }
926  }
927 
928  if (advertised) {
929  int instance;
930  sub.orb_advert = orb_advertise_multi(sub.orb_meta, data, &instance, ORB_PRIO_DEFAULT);
931  published = true;
932  }
933  }
934  }
935 
936  if (published) {
937  ++sub.publication_counter;
938  }
939 
940  return published;
941 }
942 
943 int
944 Replay::custom_command(int argc, char *argv[])
945 {
946  if (!strcmp(argv[0], "tryapplyparams")) {
947  return Replay::applyParams(true);
948  }
949 
950  if (!strcmp(argv[0], "trystart")) {
951  return Replay::task_spawn(argc, argv);
952  }
953 
954  return print_usage("unknown command");
955 }
956 
957 int
958 Replay::task_spawn(int argc, char *argv[])
959 {
960  // check if a log file was found
961  if (!isSetup()) {
962  if (argc > 0 && strncmp(argv[0], "try", 3) == 0) {
963  return 0;
964  }
965 
966  PX4_ERR("no log file given (via env variable %s)", replay::ENV_FILENAME);
967  return -1;
968  }
969 
970  _task_id = px4_task_spawn_cmd("replay",
971  SCHED_DEFAULT,
972  SCHED_PRIORITY_MAX - 5,
973  4000,
974  (px4_main_t)&run_trampoline,
975  (char *const *)argv);
976 
977  if (_task_id < 0) {
978  _task_id = -1;
979  return -errno;
980  }
981 
982  return 0;
983 }
984 
985 int
987 {
988  if (!isSetup()) {
989  if (quiet) {
990  return 0;
991  }
992 
993  PX4_ERR("no log file given (via env variable %s)", replay::ENV_FILENAME);
994  return -1;
995  }
996 
997  int ret = 0;
998  Replay *r = new Replay();
999 
1000  if (r == nullptr) {
1001  PX4_ERR("alloc failed");
1002  return -ENOMEM;
1003  }
1004 
1005  ifstream replay_file(_replay_file, ios::in | ios::binary);
1006 
1007  if (!r->readDefinitionsAndApplyParams(replay_file)) {
1008  ret = -1;
1009  }
1010 
1011  delete r;
1012 
1013  return ret;
1014 }
1015 
1016 Replay *
1017 Replay::instantiate(int argc, char *argv[])
1018 {
1019  // check the replay mode
1020  const char *replay_mode = getenv(replay::ENV_MODE);
1021 
1022  Replay *instance = nullptr;
1023 
1024  if (replay_mode && strcmp(replay_mode, "ekf2") == 0) {
1025  PX4_INFO("Ekf2 replay mode");
1026  instance = new ReplayEkf2();
1027 
1028  } else {
1029  instance = new Replay();
1030  }
1031 
1032  return instance;
1033 }
1034 
1035 int
1036 Replay::print_usage(const char *reason)
1037 {
1038  if (reason) {
1039  PX4_WARN("%s\n", reason);
1040  }
1041 
1042  PRINT_MODULE_DESCRIPTION(
1043  R"DESCR_STR(
1044 ### Description
1045 This module is used to replay ULog files.
1046 
1047 There are 2 environment variables used for configuration: `replay`, which must be set to an ULog file name - it's
1048 the log file to be replayed. The second is the mode, specified via `replay_mode`:
1049 - `replay_mode=ekf2`: specific EKF2 replay mode. It can only be used with the ekf2 module, but allows the replay
1050  to run as fast as possible.
1051 - Generic otherwise: this can be used to replay any module(s), but the replay will be done with the same speed as the
1052  log was recorded.
1053 
1054 The module is typically used together with uORB publisher rules, to specify which messages should be replayed.
1055 The replay module will just publish all messages that are found in the log. It also applies the parameters from
1056 the log.
1057 
1058 The replay procedure is documented on the [System-wide Replay](https://dev.px4.io/en/debug/system_wide_replay.html)
1059 page.
1060 )DESCR_STR");
1061 
1062  PRINT_MODULE_USAGE_NAME("replay", "system");
1063  PRINT_MODULE_USAGE_COMMAND_DESCR("start", "Start replay, using log file from ENV variable 'replay'");
1064  PRINT_MODULE_USAGE_COMMAND_DESCR("trystart", "Same as 'start', but silently exit if no log file given");
1065  PRINT_MODULE_USAGE_COMMAND_DESCR("tryapplyparams", "Try to apply the parameters from the log file");
1066  PRINT_MODULE_USAGE_DEFAULT_COMMANDS();
1067 
1068  return 0;
1069 }
1070 
1071 } //namespace px4
#define PARAM_INVALID
Handle returned when a parameter cannot be found.
Definition: param.h:103
replay specialization for Ekf2 replay
Definition: ReplayEkf2.hpp:45
const uint16_t o_size_no_padding
object size w/o padding at the end (for logger)
Definition: uORB.h:53
virtual void * apply(void *data)=0
apply compatibility to a topic
void readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file)
read a topic from the file (offset given by the subscription) into _read_buffer
Definition: Replay.cpp:862
static int applyParams(bool quiet)
Apply the parameters from the log.
Definition: Replay.cpp:986
const struct orb_metadata *const * orb_get_topics() __EXPORT
static Replay * instantiate(int argc, char *argv[])
Definition: Replay.cpp:1017
bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id)
Find next data message for this subscription, starting with the stored file offset.
Definition: Replay.cpp:557
#define PARAM_TYPE_INT32
Parameter types.
Definition: param.h:60
std::streampos _subscription_file_pos
keep track of file position to avoid adding a subscription multiple times.
Definition: Replay.hpp:221
uint16_t param_type_t
Definition: param.h:66
bool readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position)
Read and handle additional messages starting at current file position, while position < end_position...
Definition: Replay.cpp:472
__EXPORT int param_set(param_t param, const void *val)
Set the value of a parameter.
Definition: parameters.cpp:814
std::vector< uint8_t > _read_buffer
Definition: Replay.hpp:210
std::map< std::string, std::string > _file_formats
all formats we read from the file
Definition: Replay.hpp:214
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
Definition: uORB.cpp:43
STL namespace.
Parses an ULog file and replays it in &#39;real-time&#39;.
Definition: Replay.hpp:58
bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size)
Definition: Replay.cpp:505
LidarLite * instance
Definition: ll40ls.cpp:65
virtual void onExitMainLoop()
called when exiting the main replay loop
Definition: Replay.hpp:173
High-resolution timer with callouts and timekeeping.
static char * _replay_file
Definition: Replay.hpp:269
const char * o_name
unique object name
Definition: uORB.h:51
bool readFlagBits(std::ifstream &file, uint16_t msg_size)
Definition: Replay.cpp:251
Global flash based parameter store.
bool readAndAddSubscription(std::ifstream &file, uint16_t msg_size)
Definition: Replay.cpp:324
virtual uint64_t handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset)
handle delay until topic can be published.
Definition: Replay.cpp:878
std::vector< Subscription * > _subscriptions
Definition: Replay.hpp:209
int timestamp_offset
marks the field of the timestamp
Definition: Replay.hpp:133
static int print_usage(const char *reason=nullptr)
Definition: Replay.cpp:1036
uint64_t _file_start_time
Definition: Replay.hpp:216
Replay()=default
__EXPORT const char * param_name(param_t param)
Obtain the name of a parameter.
Definition: parameters.cpp:486
bool ignored
if true, it will not be considered for publication in the main loop
Definition: Replay.hpp:135
CompatSensorCombinedDtType(int gyro_integral_dt_offset_log, int gyro_integral_dt_offset_intern, int accelerometer_integral_dt_offset_log, int accelerometer_integral_dt_offset_intern)
Definition: Replay.cpp:77
int64_t _read_until_file_position
read limit if log contains appended data
Definition: Replay.hpp:223
static int custom_command(int argc, char *argv[])
Definition: Replay.cpp:944
static std::string extractArraySize(const std::string &type_name_full, int &array_size)
get the array size from a type.
Definition: Replay.cpp:656
uint8_t * data
Definition: dataman.cpp:149
std::streampos next_read_pos
Definition: Replay.hpp:137
first bytes of the file
Definition: messages.h:56
virtual void onEnterMainLoop()
called when entering the main replay loop
Definition: Replay.hpp:168
static hrt_abstime hrt_elapsed_time(const hrt_abstime *then)
Compute the delta between a timestamp taken in the past and now.
Definition: drv_hrt.h:102
#define ULOG_MSG_HEADER_LEN
Definition: messages.h:61
__EXPORT param_type_t param_type(param_t param)
Obtain the type of a parameter.
Definition: parameters.cpp:519
const orb_metadata * orb_meta
if nullptr, this subscription is invalid
Definition: Replay.hpp:130
static bool findFieldOffset(const std::string &format, const std::string &field_name, int &offset, int &field_size)
Find the offset & field size in bytes for a given field name.
Definition: Replay.cpp:441
virtual void onSubscriptionAdded(Subscription &sub, uint16_t msg_id)
called when a new subscription is added
Definition: Replay.hpp:178
#define PARAMS_OVERRIDE_FILE
Definition: Replay.cpp:67
int orb_publish(const struct orb_metadata *meta, orb_advert_t handle, const void *data)
Definition: uORB.cpp:70
bool readDropout(std::ifstream &file, uint16_t msg_size)
Definition: Replay.cpp:547
#define ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK
Definition: messages.h:156
bool readFileDefinitions(std::ifstream &file)
Read definitions section: check formats, apply parameters and store the start of the data section...
Definition: Replay.cpp:194
uint64_t _replay_start_time
Definition: Replay.hpp:217
__EXPORT param_t param_find(const char *name)
Look up a parameter by name.
Definition: parameters.cpp:370
void run() override
Definition: Replay.cpp:745
const char * name
Definition: tests_main.c:58
#define PARAM_TYPE_FLOAT
Definition: param.h:61
Object metadata.
Definition: uORB.h:50
virtual ~Replay()
Definition: Replay.cpp:87
const char * o_fields
semicolon separated list of fields (with type)
Definition: uORB.h:54
static int task_spawn(int argc, char *argv[])
Definition: Replay.cpp:958
void * apply(void *data) override
apply compatibility to a topic
Definition: Replay.cpp:97
bool publishTopic(Subscription &sub, void *data)
publish an orb topic
Definition: Replay.cpp:894
std::streampos _data_section_start
first ADD_LOGGED_MSG message
Definition: Replay.hpp:218
struct @83::@85::@87 file
std::set< std::string > _overridden_params
Definition: Replay.hpp:213
uint64_t next_timestamp
timestamp of the file
Definition: Replay.hpp:138
static bool isSetup()
Definition: Replay.hpp:94
static const orb_metadata * findTopic(const std::string &name)
Definition: Replay.cpp:642
static size_t sizeOfFullType(const std::string &type_name_full)
get the size of a type that can be an array
Definition: Replay.cpp:706
int orb_unadvertise(orb_advert_t handle)
Definition: uORB.cpp:65
Definition: bst.cpp:62
static size_t sizeOfType(const std::string &type_name)
get the size of a type that is not an array
Definition: Replay.cpp:671
const uint16_t o_size
object size
Definition: uORB.h:52
bool readFileHeader(std::ifstream &file)
Definition: Replay.cpp:170
size_t orb_topics_count() __EXPORT
bool readDefinitionsAndApplyParams(std::ifstream &file)
Read the file header and definitions sections.
Definition: Replay.cpp:714
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, int priority)
Definition: uORB.cpp:53
static void setupReplayFile(const char *file_name)
Tell the replay module that we want to use replay mode.
Definition: Replay.cpp:118
void setUserParams(const char *filename)
Definition: Replay.cpp:128
bool readFormat(std::ifstream &file, uint16_t msg_size)
file parsing methods. They return false, when further parsing should be aborted.
Definition: Replay.cpp:298
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
uint32_t param_t
Parameter handle.
Definition: param.h:98
orb_advert_t orb_advert
Definition: Replay.hpp:131
virtual bool handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file)
handle the publication of a topic update
Definition: Replay.cpp:872