44 #include <px4_platform_common/defines.h> 45 #include <px4_platform_common/posix.h> 46 #include <px4_platform_common/tasks.h> 47 #include <px4_platform_common/time.h> 48 #include <px4_platform_common/shutdown.h> 67 #define PARAMS_OVERRIDE_FILE PX4_ROOTFSDIR "/replay_params.txt" 75 char *Replay::_replay_file =
nullptr;
77 Replay::CompatSensorCombinedDtType::CompatSensorCombinedDtType(
int gyro_integral_dt_offset_log,
78 int gyro_integral_dt_offset_intern,
int accelerometer_integral_dt_offset_log,
79 int accelerometer_integral_dt_offset_intern)
80 : _gyro_integral_dt_offset_log(gyro_integral_dt_offset_log),
81 _gyro_integral_dt_offset_intern(gyro_integral_dt_offset_intern),
82 _accelerometer_integral_dt_offset_log(accelerometer_integral_dt_offset_log),
83 _accelerometer_integral_dt_offset_intern(accelerometer_integral_dt_offset_intern)
100 uint8_t *ptr = (uint8_t *)data;
102 float gyro_integral_dt;
105 float accel_integral_dt;
108 uint32_t igyro_integral_dt = (uint32_t)(gyro_integral_dt * 1e6f);
111 uint32_t iaccel_integral_dt = (uint32_t)(accel_integral_dt * 1e6f);
131 ifstream myfile(filename);
133 if (!myfile.is_open()) {
137 PX4_INFO(
"Applying override params from %s...", filename);
139 while (!myfile.eof()) {
140 getline(myfile, line);
142 if (line.empty() || line[0] ==
'#') {
146 istringstream mystrstream(line);
148 mystrstream >> value_string;
150 double param_value_double = stod(value_string);
158 value = (int32_t)param_value_double;
163 value = (float)param_value_double;
174 file.read((
char *)&msg_header,
sizeof(msg_header));
190 return memcmp(magic, msg_header.magic, 7) == 0;
196 PX4_INFO(
"Applying params from ULog file...");
208 switch (message_header.msg_type) {
217 if (!
readFormat(file, message_header.msg_size)) {
236 file.seekg(message_header.msg_size, ios::cur);
240 PX4_ERR(
"unknown log definition type %i, size %i (offset %i)",
241 (
int)message_header.msg_type, (
int)message_header.msg_size, (
int)file.tellg());
242 file.seekg(message_header.msg_size, ios::cur);
253 if (msg_size != 40) {
254 PX4_ERR(
"unsupported message length for FLAG_BITS message (%i)", msg_size);
260 file.read((
char *)message, msg_size);
262 uint8_t *incompat_flags = message + 8;
266 bool has_unknown_incompat_bits =
false;
268 if (incompat_flags[0] & ~0x1) {
269 has_unknown_incompat_bits =
true;
272 for (
int i = 1; i < 8; ++i) {
273 if (incompat_flags[i]) {
274 has_unknown_incompat_bits =
true;
278 if (has_unknown_incompat_bits) {
279 PX4_ERR(
"Log contains unknown incompat bits set. Refusing to parse");
283 if (contains_appended_data) {
284 uint64_t appended_offsets[3];
285 memcpy(appended_offsets, message + 16,
sizeof(appended_offsets));
287 if (appended_offsets[0] > 0) {
289 PX4_INFO(
"Log contains appended data. Replay will ignore this data");
302 file.read(format, msg_size);
303 format[msg_size] = 0;
309 string str_format(format);
310 size_t pos = str_format.find(
':');
312 if (pos == string::npos) {
316 string name = str_format.substr(0, pos);
317 string fields = str_format.substr(pos + 1);
329 file.read(message, msg_size);
330 message[msg_size] = 0;
342 uint8_t multi_id = *(uint8_t *)message;
343 uint16_t msg_id = ((uint16_t) message[1]) | (((uint16_t) message[2]) << 8);
344 string topic_name(message + 3);
348 PX4_WARN(
"Topic %s not found internally. Will ignore it", topic_name.c_str());
358 if (file_format != orb_meta->
o_fields) {
360 if (topic_name ==
"sensor_combined") {
361 if (
string(orb_meta->
o_fields) ==
"uint64_t timestamp;float[3] gyro_rad;uint32_t gyro_integral_dt;" 362 "int32_t accelerometer_timestamp_relative;float[3] accelerometer_m_s2;" 363 "uint32_t accelerometer_integral_dt" &&
364 file_format ==
"uint64_t timestamp;float[3] gyro_rad;float gyro_integral_dt;" 365 "int32_t accelerometer_timestamp_relative;float[3] accelerometer_m_s2;" 366 "float accelerometer_integral_dt;") {
368 int gyro_integral_dt_offset_log;
369 int gyro_integral_dt_offset_intern;
370 int accelerometer_integral_dt_offset_log;
371 int accelerometer_integral_dt_offset_intern;
374 if (
findFieldOffset(file_format,
"gyro_integral_dt", gyro_integral_dt_offset_log, unused) &&
376 findFieldOffset(file_format,
"accelerometer_integral_dt", accelerometer_integral_dt_offset_log, unused) &&
377 findFieldOffset(orb_meta->
o_fields,
"accelerometer_integral_dt", accelerometer_integral_dt_offset_intern, unused)) {
380 accelerometer_integral_dt_offset_log, accelerometer_integral_dt_offset_intern);
386 PX4_WARN(
"Formats for %s don't match. Will ignore it.", topic_name.c_str());
387 PX4_WARN(
" Internal format: %s", orb_meta->
o_fields);
388 PX4_WARN(
" File format : %s", file_format.c_str());
396 subscription->
compat = compat;
402 if (!timestamp_found) {
406 if (field_size != 8) {
407 PX4_ERR(
"Unsupported timestamp with size %i, ignoring the topic %s", field_size, orb_meta->
o_name);
412 streampos cur_pos = file.tellg();
426 PX4_DEBUG(
"adding subscription for %s (msg_id %i)", subscription->
orb_meta->
o_name, msg_id);
443 size_t prev_field_end = 0;
444 size_t field_end = format.find(
';');
448 while (field_end != string::npos) {
449 size_t space_pos = format.find(
' ', prev_field_end);
451 if (space_pos != string::npos) {
452 string type_name_full = format.substr(prev_field_end, space_pos - prev_field_end);
453 string cur_field_name = format.substr(space_pos + 1, field_end - space_pos - 1);
455 if (cur_field_name == field_name) {
464 prev_field_end = field_end + 1;
465 field_end = format.find(
';', prev_field_end);
476 while (file.tellg() < end_position) {
483 switch (message_header.msg_type) {
496 file.seekg(message_header.msg_size, ios::cur);
509 file.read((
char *)message, msg_size);
515 uint8_t key_len = message[0];
516 string key((
char *)message + 1, key_len);
518 size_t pos = key.find(
' ');
520 if (pos == string::npos) {
524 string type = key.substr(0, pos);
532 if (type !=
"int32_t" && type !=
"float") {
533 PX4_WARN(
"unknown parameter type %s, name %s (ignoring it)", type.c_str(), param_name.c_str());
540 param_set(handle, (
const void *)(message + 1 + key_len));
550 file.read((
char *)&duration,
sizeof(duration));
552 PX4_INFO(
"Dropout in replayed log, %i ms", (
int)duration);
565 file.seekg(message_header.msg_size, ios::cur);
568 uint16_t file_msg_id;
571 while (file && !done) {
572 streampos cur_pos = file.tellg();
580 file.setstate(std::ios::eofbit);
584 switch (message_header.msg_type) {
590 file.read((
char *)&file_msg_id,
sizeof(file_msg_id));
593 if (msg_id == file_msg_id) {
601 PX4_ERR(
"data message %s has wrong size %i (expected %i). Skipping",
604 file.seekg(message_header.msg_size -
sizeof(file_msg_id), ios::cur);
608 file.seekg(message_header.msg_size -
sizeof(file_msg_id), ios::cur);
621 file.seekg(message_header.msg_size, ios::cur);
626 PX4_ERR(
"unknown log message type %i, size %i (offset %i)",
627 (
int)message_header.msg_type, (
int)message_header.msg_size, (
int)file.tellg());
628 file.seekg(message_header.msg_size, ios::cur);
647 if (name == topics[i]->o_name) {
658 size_t start_pos = type_name_full.find(
'[');
659 size_t end_pos = type_name_full.find(
']');
661 if (start_pos == string::npos || end_pos == string::npos) {
663 return type_name_full;
666 array_size = atoi(type_name_full.substr(start_pos + 1, end_pos - start_pos - 1).c_str());
667 return type_name_full.substr(0, start_pos);
673 if (type_name ==
"int8_t" || type_name ==
"uint8_t") {
676 }
else if (type_name ==
"int16_t" || type_name ==
"uint16_t") {
679 }
else if (type_name ==
"int32_t" || type_name ==
"uint32_t") {
682 }
else if (type_name ==
"int64_t" || type_name ==
"uint64_t") {
685 }
else if (type_name ==
"float") {
688 }
else if (type_name ==
"double") {
691 }
else if (type_name ==
"char" || type_name ==
"bool") {
701 PX4_ERR(
"unknown type: %s", type_name.c_str());
719 if (*(
char *)&num != 1) {
720 PX4_ERR(
"Replay only works on little endian!");
724 if (!file.is_open()) {
725 PX4_ERR(
"Failed to open replay file");
730 PX4_ERR(
"Failed to read file header. Not a valid ULog file");
736 PX4_ERR(
"Failed to read ULog definitions section. Broken file?");
747 ifstream replay_file(
_replay_file, ios::in | ios::binary);
757 PX4_INFO(
"Replay in progress...");
766 PX4_ERR(
"Failed to read subscription");
773 uint32_t nr_published_messages = 0;
776 while (!should_exit() && replay_file) {
780 uint64_t next_file_time = 0;
781 int next_msg_id = -1;
782 bool first_time =
true;
792 if (first_time || subscription->
next_timestamp < next_file_time) {
794 next_msg_id = (int)i;
800 if (next_msg_id == -1) {
806 if (next_file_time == 0) {
813 replay_file.seekg(last_additional_message_pos);
816 last_additional_message_pos = next_additional_message_pos;
818 const uint64_t publish_timestamp =
handleTopicDelay(next_file_time, timestamp_offset);
825 ++nr_published_messages;
838 if (subscription->compat) {
839 delete subscription->compat;
840 subscription->compat =
nullptr;
843 if (subscription->orb_advert) {
845 subscription->orb_advert =
nullptr;
849 if (!should_exit()) {
850 PX4_INFO(
"Replay done (published %u msgs, %.3lf s)", nr_published_messages,
855 px4_shutdown_request(
false,
false);
868 replay_file.read((
char *)
_read_buffer.data(), msg_read_size);
880 const uint64_t publish_timestamp = next_file_time + timestamp_offset;
887 px4_usleep(publish_timestamp - cur_time);
890 return publish_timestamp;
896 bool published =
false;
913 bool advertised =
false;
920 if (subscription->orb_meta) {
921 if (strcmp(sub.
orb_meta->
o_name, subscription->orb_meta->o_name) == 0 &&
922 subscription->orb_advert && subscription->multi_id == sub.
multi_id - 1) {
946 if (!strcmp(argv[0],
"tryapplyparams")) {
950 if (!strcmp(argv[0],
"trystart")) {
962 if (argc > 0 && strncmp(argv[0],
"try", 3) == 0) {
966 PX4_ERR(
"no log file given (via env variable %s)", replay::ENV_FILENAME);
970 _task_id = px4_task_spawn_cmd(
"replay",
972 SCHED_PRIORITY_MAX - 5,
974 (px4_main_t)&run_trampoline,
975 (
char *
const *)argv);
993 PX4_ERR(
"no log file given (via env variable %s)", replay::ENV_FILENAME);
1001 PX4_ERR(
"alloc failed");
1005 ifstream replay_file(
_replay_file, ios::in | ios::binary);
1020 const char *replay_mode = getenv(replay::ENV_MODE);
1024 if (replay_mode && strcmp(replay_mode,
"ekf2") == 0) {
1025 PX4_INFO(
"Ekf2 replay mode");
1039 PX4_WARN(
"%s\n", reason);
1042 PRINT_MODULE_DESCRIPTION(
1045 This module is used to replay ULog files. 1047 There are 2 environment variables used for configuration: `replay`, which must be set to an ULog file name - it's 1048 the log file to be replayed. The second is the mode, specified via `replay_mode`: 1049 - `replay_mode=ekf2`: specific EKF2 replay mode. It can only be used with the ekf2 module, but allows the replay 1050 to run as fast as possible. 1051 - Generic otherwise: this can be used to replay any module(s), but the replay will be done with the same speed as the 1054 The module is typically used together with uORB publisher rules, to specify which messages should be replayed. 1055 The replay module will just publish all messages that are found in the log. It also applies the parameters from 1058 The replay procedure is documented on the [System-wide Replay](https://dev.px4.io/en/debug/system_wide_replay.html) 1062 PRINT_MODULE_USAGE_NAME("replay",
"system");
1063 PRINT_MODULE_USAGE_COMMAND_DESCR(
"start",
"Start replay, using log file from ENV variable 'replay'");
1064 PRINT_MODULE_USAGE_COMMAND_DESCR(
"trystart",
"Same as 'start', but silently exit if no log file given");
1065 PRINT_MODULE_USAGE_COMMAND_DESCR(
"tryapplyparams",
"Try to apply the parameters from the log file");
1066 PRINT_MODULE_USAGE_DEFAULT_COMMANDS();
#define PARAM_INVALID
Handle returned when a parameter cannot be found.
int _accelerometer_integral_dt_offset_log
replay specialization for Ekf2 replay
virtual void * apply(void *data)=0
apply compatibility to a topic
int _accelerometer_integral_dt_offset_intern
void readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file)
read a topic from the file (offset given by the subscription) into _read_buffer
static int applyParams(bool quiet)
Apply the parameters from the log.
const struct orb_metadata *const * orb_get_topics() __EXPORT
static Replay * instantiate(int argc, char *argv[])
bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id)
Find next data message for this subscription, starting with the stored file offset.
#define PARAM_TYPE_INT32
Parameter types.
std::streampos _subscription_file_pos
keep track of file position to avoid adding a subscription multiple times.
int _gyro_integral_dt_offset_log
bool readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position)
Read and handle additional messages starting at current file position, while position < end_position...
__EXPORT int param_set(param_t param, const void *val)
Set the value of a parameter.
std::vector< uint8_t > _read_buffer
std::map< std::string, std::string > _file_formats
all formats we read from the file
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
Parses an ULog file and replays it in 'real-time'.
bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size)
virtual void onExitMainLoop()
called when exiting the main replay loop
High-resolution timer with callouts and timekeeping.
static char * _replay_file
bool readFlagBits(std::ifstream &file, uint16_t msg_size)
Global flash based parameter store.
bool readAndAddSubscription(std::ifstream &file, uint16_t msg_size)
virtual uint64_t handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset)
handle delay until topic can be published.
std::vector< Subscription * > _subscriptions
int timestamp_offset
marks the field of the timestamp
static int print_usage(const char *reason=nullptr)
uint64_t _file_start_time
__EXPORT const char * param_name(param_t param)
Obtain the name of a parameter.
bool ignored
if true, it will not be considered for publication in the main loop
CompatSensorCombinedDtType(int gyro_integral_dt_offset_log, int gyro_integral_dt_offset_intern, int accelerometer_integral_dt_offset_log, int accelerometer_integral_dt_offset_intern)
int64_t _read_until_file_position
read limit if log contains appended data
static int custom_command(int argc, char *argv[])
static std::string extractArraySize(const std::string &type_name_full, int &array_size)
get the array size from a type.
std::streampos next_read_pos
virtual void onEnterMainLoop()
called when entering the main replay loop
static hrt_abstime hrt_elapsed_time(const hrt_abstime *then)
Compute the delta between a timestamp taken in the past and now.
#define ULOG_MSG_HEADER_LEN
__EXPORT param_type_t param_type(param_t param)
Obtain the type of a parameter.
const orb_metadata * orb_meta
if nullptr, this subscription is invalid
static bool findFieldOffset(const std::string &format, const std::string &field_name, int &offset, int &field_size)
Find the offset & field size in bytes for a given field name.
virtual void onSubscriptionAdded(Subscription &sub, uint16_t msg_id)
called when a new subscription is added
#define PARAMS_OVERRIDE_FILE
int orb_publish(const struct orb_metadata *meta, orb_advert_t handle, const void *data)
bool readDropout(std::ifstream &file, uint16_t msg_size)
#define ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK
bool readFileDefinitions(std::ifstream &file)
Read definitions section: check formats, apply parameters and store the start of the data section...
uint64_t _replay_start_time
__EXPORT param_t param_find(const char *name)
Look up a parameter by name.
static int task_spawn(int argc, char *argv[])
void * apply(void *data) override
apply compatibility to a topic
bool publishTopic(Subscription &sub, void *data)
publish an orb topic
std::streampos _data_section_start
first ADD_LOGGED_MSG message
int _gyro_integral_dt_offset_intern
struct @83::@85::@87 file
std::set< std::string > _overridden_params
uint64_t next_timestamp
timestamp of the file
static const orb_metadata * findTopic(const std::string &name)
static size_t sizeOfFullType(const std::string &type_name_full)
get the size of a type that can be an array
int orb_unadvertise(orb_advert_t handle)
static size_t sizeOfType(const std::string &type_name)
get the size of a type that is not an array
bool readFileHeader(std::ifstream &file)
size_t orb_topics_count() __EXPORT
bool readDefinitionsAndApplyParams(std::ifstream &file)
Read the file header and definitions sections.
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, int priority)
static void setupReplayFile(const char *file_name)
Tell the replay module that we want to use replay mode.
void setUserParams(const char *filename)
bool readFormat(std::ifstream &file, uint16_t msg_size)
file parsing methods. They return false, when further parsing should be aborted.
__EXPORT hrt_abstime hrt_absolute_time(void)
Get absolute time in [us] (does not wrap).
uint32_t param_t
Parameter handle.
virtual bool handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file)
handle the publication of a topic update