Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropped samples after system clock adjustments #5019

Open
1 task done
ma30002000 opened this issue Jul 2, 2024 · 13 comments
Open
1 task done

Dropped samples after system clock adjustments #5019

ma30002000 opened this issue Jul 2, 2024 · 13 comments
Labels
in progress Issue or PR which is being reviewed

Comments

@ma30002000
Copy link
Contributor

ma30002000 commented Jul 2, 2024

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

When dealing with system clock adjustments (manually or due to clock server synchronization), my subscriber drops all samples once the system has been set back into the past.
This seems to be due to DataReaderHistory::received_change_keep_last / DataReaderHistory::completed_change_keep_last comparing all the incoming samples' sourceTimestamp (which will be a timestamp in the past once the clock has been changed) to the first change's, leading to all samples to be dropped:

        CacheChange_t* first_change = instance_changes.at(0);
        if (change->sourceTimestamp >= first_change->sourceTimestamp)
        {
            // As the instance is ordered by source timestamp, we can always remove the first one.
            ret_value = remove_change_sub(first_change);
        }
        else
        {
            // Received change is older than oldest, and should be discarded
            return true;
        }

If I remove the if and simply drop the first sample everything seems to work unaffected from the system clock adjustments.

Note that I would expect the current fast dds behaviour if DestinationOrderQosPolicy were implemented (which it is not) and set to BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS. However, according to the manual should be BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS .

Note that I have created a pull request for additional observations when the system clock is adjusted (PR #5018).

This might be related to #4850.

Current behavior

Samples get dropped when publisher's system clock is set into the past.

Steps to reproduce

Set back system clock after disabling automatic synchronization via NTP.

Fast DDS version/commit

2.14.2

Platform/Architecture

Ubuntu Focal 20.04 amd64

Transport layer

Shared Memory Transport (SHM)

Additional context

No response

XML configuration file

No response

Relevant log output

No response

Network traffic capture

No response

@ma30002000 ma30002000 added the triage Issue pending classification label Jul 2, 2024
@i-and
Copy link

i-and commented Jul 7, 2024

In my opinion, the stack functionality should be as independent of the system clock as possible. Otherwise, hard-to-diagnose errors will occur at random points in time (when the system clock jumps forward or backward), such as @ma30002000 pointed out above. From this point of view, the use of sourceTimestamp (in the basis of the system clock) should be minimized. At the same time, sourceTimestamp is used in the stack in the following five places:

  1. Where @ma30002000 indicated, namely in the DataReaderHistory::received_change_keep_last() and DataReaderHistory::completed_change_keep_last() methods. This condition for discarding accepted samples contradicts the using QoS BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS - i.e. the reader history should not be sorted by source time. Also these samples with large sequence numbers are silently discarded without notifying the user (REJECTED_BY_... callback is not called). I suggest that these sample drop conditions be excluded from the code for QoS BY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOS.
  2. The reader's history is sorted by the timestamp of the source using the following comparison function:
    inline bool history_order_cmp(
    const CacheChange_t* lhs,
    const CacheChange_t* rhs)
    {
    return lhs->writerGUID == rhs->writerGUID ?
    lhs->sequenceNumber < rhs->sequenceNumber :
    lhs->sourceTimestamp < rhs->sourceTimestamp;
    }

    This does not correspond to the set QoS parameter BY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOS. I propose to implement sorting in accordance with QoS BY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOS and without using sourceTimestamp. In this case, it will probably be enough to use the sample sequence numbers from the corresponding DataWriter.
  3. In the implementation of the QoS Lifespan for DataWriter. From the point of view of practical use, this parameter should work in terms of local time and not depend on system clock jumps (in the case of a backward jump, the sample will mistakenly be in the history for more than a given Lifespan duration, and in the case of a forward jump, it will be removed from the history ahead of time). Technically, this is implemented by adding a steady clock timestamp to the CacheChange_t with its analysis in the method DataWriterImpl::lifespan_expired().
  4. In the implementation of the QoS Lifespan for DataReader. The reader's implementation is based on the assumption that the Writer's and Reader's system clocks are synchronized. If this is not the case, the received samples will be discarded in the DataReaderImpl::on_new_cache_change_added() method. Incorrect operation will also be observed here when time jumps forward or backward (by analogy with paragraph 3 above). It would be more interesting for the user to get a mechanism to control the obsolescence of samples in the Reader's history, which works based on the local(steady) clock and does not depend on the quality of synchronization of the system clock. I suggest considering such an implementation (perhaps with its activation from the new QoS parameter for DataReader for preserve the old default behavior of Lifespan QoS).
  5. In the implementation of disable_positive_ack in the StatefulWriter side. I suggest considering the possibility of using a steady clock with its stamping at the CacheChange_t (see paragraph 3 above).

Please comment on the five points presented above.

@elianalf
Copy link
Contributor

elianalf commented Jul 9, 2024

Hi @ma30002000,
Thanks for using Fast DDS.
We are trying to reproduce the issue and investigate. We will come back with a feedback.

@elianalf elianalf added in progress Issue or PR which is being reviewed and removed triage Issue pending classification labels Jul 9, 2024
@ma30002000
Copy link
Contributor Author

Any indications and hints concerning the root cause (and a possible fix) would be highly appreciated..

@MiguelCompany
Copy link
Member

@ma30002000 @i-and

Thank you for you patience here.

I have rebased #5018 and added some commits that

  1. Refactor parts of the code using std::system_clock to always use Time_t equivalent code.
  2. Update current_time_since_unix_epoch so Time_t::now() is steady.

These changes improve resilience against system_clock being updated while the application is running.
Would you please review / test those changes?

Regarding lifespan, section 2.2.3.16 of the DDS Standard states:

  1. The ‘expiration time’ of each sample is computed by adding the duration specified by the LIFESPAN QoS to the source
    timestamp
  2. This QoS relies on the sender and receiving applications having their clocks sufficiently synchronized

So there's not much we can do here.
In the future, we could add a way for the user to inject a method that returns the current time, and make Time_t::now() depend upon it.
What do you think about that?

@MiguelCompany
Copy link
Member

@ma30002000 @i-and FYI, we have just merged #5018 to master, and ordered backports to the supported versions (2.14.x, and 2.10.x)

@i-and
Copy link

i-and commented Sep 3, 2024

Hi @MiguelCompany,
I will try to see the changes by the end of this week.

@i-and
Copy link

i-and commented Sep 9, 2024

The proposed changes do not completely solve the issue. To confirm this, I wrote a test based on the example of hello_world and the revision of git 70314ce (Date: Fri Sep 6 08:48:14 2024 +0200).
It is also necessary to patch the following:

diff --git a/examples/cpp/hello_world/PublisherApp.cpp b/examples/cpp/hello_world/PublisherApp.cpp
index 63b433113..3bb9cc515 100644
--- a/examples/cpp/hello_world/PublisherApp.cpp
+++ b/examples/cpp/hello_world/PublisherApp.cpp
@@ -130,6 +130,16 @@ void PublisherApp::run()
 {
     while (!is_stopped() && ((samples_ == 0) || (hello_.index() < samples_)))
     {
+        if (hello_.index() >= 5) 
+        {
+            std::cout << "Index=" << hello_.index() << ". Paused forever..." << std::endl;
+            std::unique_lock<std::mutex> period_lock(mutex_);
+            cv_.wait(period_lock, [&]()
+                {
+                    return is_stopped();
+                });
+        }
+
         if (publish())
         {
             std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index()
diff --git a/examples/cpp/hello_world/WaitsetSubscriberApp.cpp b/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
index f039365b6..2581a83f6 100644
--- a/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
+++ b/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
@@ -21,6 +21,7 @@
 
 #include <condition_variable>
 #include <stdexcept>
+#include <thread>
 
 #include <fastdds/dds/core/condition/GuardCondition.hpp>
 #include <fastdds/dds/core/condition/WaitSet.hpp>
@@ -134,11 +135,11 @@ void WaitsetSubscriberApp::run()
                     reader_->get_subscription_matched_status(status_);
                     if (status_.current_count_change == 1)
                     {
-                        std::cout << "Waitset Subscriber matched." << std::endl;
+                        std::cout << "Waitset Subscriber matched. Handle=" << status_.last_publication_handle << std::endl;
                     }
                     else if (status_.current_count_change == -1)
                     {
-                        std::cout << "Waitset Subscriber unmatched." << std::endl;
+                        std::cout << "Waitset Subscriber unmatched."  << status_.last_publication_handle << std::endl;
                     }
                     else
                     {
@@ -150,6 +151,12 @@ void WaitsetSubscriberApp::run()
                 if (changed_statuses.is_active(StatusMask::data_available()))
                 {
                     SampleInfo info;
+                    if (received_samples_ == 1)
+                    {
+                        auto pause = std::chrono::milliseconds(60000);
+                        std::cout << "When data available WAITING for " << pause.count() << std::endl;
+                        std::this_thread::sleep_for(pause);
+                    }
                     while ((!is_stopped()) &&
                             (RETCODE_OK == reader_->take_next_sample(&hello_, &info)))
                     {
@@ -158,7 +165,7 @@ void WaitsetSubscriberApp::run()
                             received_samples_++;
                             // Print Hello world message data
                             std::cout << "Message: '" << hello_.message() << "' with index: '"
-                                      << hello_.index() << "' RECEIVED" << std::endl;
+                                      << hello_.index() << "' RECEIVED from " << info.publication_handle << std::endl;
                             if (samples_ > 0 && (received_samples_ >= samples_))
                             {
                                 stop();
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index 0dcd18a45..f0bcfed2e 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -262,6 +262,7 @@ bool DataReaderHistory::received_change_keep_last(
         DataReaderInstance::ChangeCollection& instance_changes = vit->second->cache_changes;
         if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
         {
+            std::cout << "history_qos_.depth=" << history_qos_.depth << std::endl;
             ret_value = true;
         }
         else
@@ -270,11 +271,17 @@ bool DataReaderHistory::received_change_keep_last(
             CacheChange_t* first_change = instance_changes.at(0);
             if (a_change->sourceTimestamp >= first_change->sourceTimestamp)
             {
+                std::cout <<  "As the instance is ordered by source timestamp, we can always remove the first one. "
+                    "history_qos_.depth=" << history_qos_.depth << ", a_change->sourceTimestamp=" << a_change->sourceTimestamp <<
+                    ", first_change->sourceTimestamp=" << first_change->sourceTimestamp << std::endl;
                 // As the instance is ordered by source timestamp, we can always remove the first one.
                 ret_value = remove_change_sub(first_change);
             }
             else
             {
+                std::cout <<  "Received change is older than oldest, and should be discarded. "
+                    "history_qos_.depth=" << history_qos_.depth << ", a_change->sourceTimestamp=" << a_change->sourceTimestamp <<
+                    ", first_change->sourceTimestamp=" << first_change->sourceTimestamp << std::endl;
                 // Received change is older than oldest, and should be discarded
                 return true;
             }

After compiling the hello_world example, open three terminals and run the following in each:

  1. In the first terminal, run ./hello_world subscriber -w. The subsequent actions in the 2nd and 3rd terminals must be performed in a time not exceeding 60 seconds.
  2. In the second terminal: ./hello_world publisher.
  3. After disabling the time synchronization service (sudo systemctl stop systemd-timesyncd) - set the system clock back (for example, by one hour).
  4. In the third terminal: ./hello_world publisher.
    As a result:
Subscriber running. Please press Ctrl+C to stop the Subscriber at any time.
Waitset Subscriber matched. Handle=1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
history_qos_.depth=1
Message: 'Hello world' with index: '1' RECEIVED from 1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
history_qos_.depth=1
When data available WAITING for 60000
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831240.997453690, first_change->sourceTimestamp=1725831240.896825205
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831241.97925046, first_change->sourceTimestamp=1725831240.997453690
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831241.198400104, first_change->sourceTimestamp=1725831241.97925046
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.63839469, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.164280786, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.264997918, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.365657331, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.466348101, first_change->sourceTimestamp=1725831241.198400104
Message: 'Hello world' with index: '5' RECEIVED from 1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
Waitset Subscriber matched. Handle=1.f.f7.f3.b.59.9d.7d.0.0.0.0.0.0.1.3
Publisher running. Please press Ctrl+C to stop the Publisher at any time.
Publisher matched.
Message: 'Hello world' with index: '1' SENT
Message: 'Hello world' with index: '2' SENT
Message: 'Hello world' with index: '3' SENT
Message: 'Hello world' with index: '4' SENT
Message: 'Hello world' with index: '5' SENT
Index=5. Paused forever...
Publisher running. Please press Ctrl+C to stop the Publisher at any time.
Publisher matched.
Message: 'Hello world' with index: '1' SENT
Message: 'Hello world' with index: '2' SENT
Message: 'Hello world' with index: '3' SENT
Message: 'Hello world' with index: '4' SENT
Message: 'Hello world' with index: '5' SENT
Index=5. Paused forever...

As you can see from terminal 1, all samples from the second publisher were discarded in the method DataReaderHistory::received_change_keep_last(). I.e. the issue is present.
In my opinion, in order to reliably correct this error, you should use @ma30002000 recommendation and exclude the conditions in methods DataReaderHistory::received_change_keep_last and DataReaderHistory::completed_change_keep_last that lead to discarding samples.
To do this, you will probably have to rework the sorting criteria for the reader's history so that it does not depend on SourceTimestamp. But this will have to be done for the correct implementation of QoS BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS.

About the new implementation of the function void current_time_since_unix_epoch():

static void current_time_since_unix_epoch(
int32_t& secs,
uint32_t& nanosecs)
{
using namespace std::chrono;
static const auto init_time_since_epoch = system_clock::now().time_since_epoch();
static const auto init_steady_time = steady_clock::now();
// Get time since epoch
auto t_elapsed = steady_clock::now() - init_steady_time;
auto t_since_epoch = init_time_since_epoch + t_elapsed;
// Get seconds
auto secs_t = duration_cast<seconds>(t_since_epoch);
// Remove seconds from time
t_since_epoch -= secs_t;
// Get seconds and nanoseconds
secs = static_cast<int32_t>(secs_t.count());
nanosecs = static_cast<uint32_t>(duration_cast<nanoseconds>(t_since_epoch).count());
}

I suggest not to do this and return to its previous implementation for the following reasons:

  1. This does not solve the issue (see the test above).
  2. The time adjustment is completely blocked when the FastDDS starts before the system time synchronization with the master is performed.
  3. Since the frequency generators for the steady and system clocks are not connected in any way, their divergence will be observed over time.
  4. From the user's point of view, the interface will change because the time returned in many places (for example in the SampleInfo::source_timestamp) will no longer correspond to the usual system clocks.

Based on the above, I propose to correct the situation with the system clock where they are used and at the same time use the standard system clock (without freezing at the time of FastDDS launch).

Section 2.2.3.16 of the DDS Standard contains the following:
This QoS relies on the sender and receiving applications having their clocks sufficiently synchronized. If this is not the case and the Service can detect it, the DataReader is allowed to use the reception timestamp instead of the source timestamp in its computation of the ‘expiration time.’

In order for the Service (FastDDS) to detect the lack of synchronization, for example, you can implement an additional parameter set by the user. At the same time, the user would set this parameter to the "do not use a source timestamp" state in the case when he knew for sure that the time in the system was not synchronized. This would probably be a compromise solution that would not contradict the provision of the DDS standard.

@MiguelCompany
Copy link
Member

@i-and Thank you for testing.

First, let me remark one of my comments above

These changes improve resilience against system_clock being updated while the application is running.

So yes, the merged PR was only addressing the case of updating the system clock after the application has been launched.
That said, I agree on reverting the implementation of current_time_since_unix_epoch due to this comment:

2. The time adjustment is completely blocked when the FastDDS starts before the system time synchronization with the master is performed.

We will try to review the implementation of the reader history to improve this situation, but in the mean time I suggest you use write_w_timestamp so you can set your own clock when writing samples.

@MiguelCompany
Copy link
Member

MiguelCompany commented Sep 9, 2024

That said, I agree on reverting the implementation of current_time_since_unix_epoch

@i-and We did this in #5213

@i-and
Copy link

i-and commented Sep 9, 2024

@MiguelCompany Thanks for the quick response.

We will try to review the implementation of the reader history to improve this situation,

When rework the reader's history, please take into account the actual state of DestinationOrderQosPolicy. Now there is a contradiction there: the value BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS is set, and the behavior corresponds rather to BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS.
Ideally, implement DestinationOrderQosPolicy with full support for these two options.

@ma30002000
Copy link
Contributor Author

Hi, sorry for the long delay....

I also do agree on reverting the implementation of current_time_since_unix_epoch.

A proper implementation of DestinationOrderQosPolicy with the correct behavior would be nice ;-)

I helped myself with the following patch, allowing system_clock jumps on single host only usage of fast dds
(similarly to https://github.com/eProsima/Fast-DDS/blob/master/src/cpp/rtps/common/ChangeComparison.hpp):

diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index d8455e497..94ae73ad2 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -274,7 +274,9 @@ bool DataReaderHistory::received_change_keep_last(
         {
             // Try to substitute the oldest sample.
             CacheChange_t* first_change = instance_changes.at(0);
-            if (a_change->sourceTimestamp >= first_change->sourceTimestamp)
+            if (a_change->writerGUID == first_change->writerGUID ?
+                a_change->sequenceNumber >= first_change->sequenceNumber :
+                a_change->sourceTimestamp >= first_change->sourceTimestamp)
             {
                 // As the instance is ordered by source timestamp, we can always remove the first one.
                 ret_value = remove_change_sub(first_change);
@@ -808,7 +810,9 @@ bool DataReaderHistory::completed_change_keep_last(
     {
         // Try to substitute the oldest sample.
         CacheChange_t* first_change = instance_changes.at(0);
-        if (change->sourceTimestamp >= first_change->sourceTimestamp)
+        if (change->writerGUID == first_change->writerGUID ?
+            change->sequenceNumber >= first_change->sequenceNumber :
+            change->sourceTimestamp >= first_change->sourceTimestamp)
         {
             // As the instance is ordered by source timestamp, we can always remove the first one.
             ret_value = remove_change_sub(first_change);

This should make sample substitution consistent with the history ordering in my opinion.

@MiguelCompany
Copy link
Member

@ma30002000 Thank you very much for that diff!
You could in fact use if (rtps::history_order_cmp(first_change, change)), and I do think that would fix the case of changing the system clock while the writer is running.

@ma30002000
Copy link
Contributor Author

ma30002000 commented Oct 15, 2024

@MiguelCompany Yes, the attached patch (together with the changes in #5018 + patching of high_resolution_clock) fixed the issue in my specific use case (single host system).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in progress Issue or PR which is being reviewed
Projects
None yet
Development

No branches or pull requests

4 participants