travelynx/lib/Travelynx/Command/work.pm
Daniel Friesel 89e709d8d5 Allow linking a Träwelling account, auto-sync Träwelling→travelynx
travelynx→Träwelling is still work-in-progress

Squashed commit of the following:

commit 97faa6e2e6c8d20fba30f2d0f6e78187ceeb72e6
Author: Daniel Friesel <derf@finalrewind.org>
Date:   Wed Sep 30 18:50:05 2020 +0200

    improve traewelling log and tx handling

commit 487d7dd728b9d45b731bdc7098cf3358ea2e206e
Author: Daniel Friesel <derf@finalrewind.org>
Date:   Wed Sep 30 18:02:41 2020 +0200

    add missing traewelling template

commit 0148da2f48d9a52dcddc0ab81f83d8f8ac3062ab
Author: Daniel Friesel <derf@finalrewind.org>
Date:   Wed Sep 30 18:02:35 2020 +0200

    improve traewelling pull sync

commit 4861a9750f9f2d7621043361d0af6b0a8869a0df
Author: Daniel Friesel <derf@finalrewind.org>
Date:   Tue Sep 29 22:14:24 2020 +0200

    wip checkin from traewelling

commit f6aeb6f06998a2a7a80f63a7b1b688b1a26b66bd
Author: Daniel Friesel <derf@finalrewind.org>
Date:   Tue Sep 29 18:37:53 2020 +0200

    refactor traewelling integration. login and logout are less of a hack now.

    checkin and checkout are not supported at the moment.
2020-09-30 19:12:29 +02:00

286 lines
7.4 KiB
Perl

package Travelynx::Command::work;
use Mojo::Base 'Mojolicious::Command';
use DateTime;
use JSON;
use List::Util;
has description =>
'Perform automatic checkout when users arrive at their destination';
has usage => sub { shift->extract_usage };
sub run {
my ($self) = @_;
my $now = DateTime->now( time_zone => 'Europe/Berlin' );
my $json = JSON->new;
my $db = $self->app->pg->db;
for my $entry (
$db->select( 'in_transit_str', '*', { cancelled => 0 } )->hashes->each )
{
my $uid = $entry->{user_id};
my $dep = $entry->{dep_eva};
my $arr = $entry->{arr_eva};
my $train_id = $entry->{train_id};
# Note: IRIS data is not always updated in real-time. Both departure and
# arrival delays may take several minutes to appear, especially in case
# of large-scale disturbances. We work around this by continuing to
# update departure data for up to 15 minutes after departure and
# delaying automatic checkout by at least 10 minutes.
eval {
if ( $now->epoch - $entry->{real_dep_ts} < 900 ) {
my $status = $self->app->iris->get_departures(
station => $dep,
lookbehind => 30,
lookahead => 30
);
if ( $status->{errstr} ) {
die("get_departures($dep): $status->{errstr}\n");
}
my ($train) = List::Util::first { $_->train_id eq $train_id }
@{ $status->{results} };
if ( not $train ) {
die("could not find train $train_id at $dep\n");
}
# selecting on user_id and train_no avoids a race condition when
# a user checks into a new train while we are fetching data for
# their previous journey. In this case, the new train would
# receive data from the previous journey.
$db->update(
'in_transit',
{
dep_platform => $train->platform,
real_departure => $train->departure,
route =>
$json->encode( [ $self->app->route_diff($train) ] ),
messages => $json->encode(
[
map { [ $_->[0]->epoch, $_->[1] ] }
$train->messages
]
),
},
{
user_id => $uid,
train_no => $train->train_no
}
);
if ( $train->departure_is_cancelled and $arr ) {
# depending on the amount of users in transit, some time may
# have passed between fetching $entry from the database and
# now. Ensure that the user is still checked into this train
# before calling checkout to mark the cancellation.
if (
$db->select(
'in_transit',
'count(*) as count',
{
user_id => $uid,
train_no => $train->train_no,
checkin_station_id => $dep,
checkout_station_id => $arr,
}
)->hash->{count}
)
{
$db->update(
'in_transit',
{
cancelled => 1,
},
{
user_id => $uid,
train_no => $train->train_no,
checkin_station_id => $dep,
checkout_station_id => $arr,
}
);
# check out (adds a cancelled journey and resets journey state
# to checkin
$self->app->checkout(
station => $arr,
force => 1,
uid => $uid
);
}
}
else {
$self->app->add_route_timestamps( $uid, $train, 1 );
}
}
};
if ($@) {
$self->app->log->error("work($uid)/departure: $@");
}
eval {
if (
$arr
and ( not $entry->{real_arr_ts}
or $now->epoch - $entry->{real_arr_ts} < 600 )
)
{
my $status = $self->app->iris->get_departures(
station => $arr,
lookbehind => 20,
lookahead => 220
);
if ( $status->{errstr} ) {
die("get_departures($arr): $status->{errstr}\n");
}
# Note that a train may pass the same station several times.
# Notable example: S41 / S42 ("Ringbahn") both starts and
# terminates at Berlin Südkreuz
my ($train) = List::Util::first {
$_->train_id eq $train_id
and $_->sched_arrival
and $_->sched_arrival->epoch > $entry->{sched_dep_ts}
}
@{ $status->{results} };
$train //= List::Util::first { $_->train_id eq $train_id }
@{ $status->{results} };
if ( not $train ) {
# If we haven't seen the train yet, its arrival is probably
# too far in the future. This is not critical.
return;
}
# selecting on user_id, train_no and checkout_station_id avoids a
# race condition when a user checks into a new train or changes
# their destination station while we are fetching times based on no
# longer valid database entries.
$db->update(
'in_transit',
{
arr_platform => $train->platform,
sched_arrival => $train->sched_arrival,
real_arrival => $train->arrival,
route =>
$json->encode( [ $self->app->route_diff($train) ] ),
messages => $json->encode(
[
map { [ $_->[0]->epoch, $_->[1] ] }
$train->messages
]
),
},
{
user_id => $uid,
train_no => $train->train_no,
checkout_station_id => $arr
}
);
if ( $train->arrival_is_cancelled ) {
# depending on the amount of users in transit, some time may
# have passed between fetching $entry from the database and
# now. Ensure that the user is still checked into this train
# before calling checkout to mark the cancellation.
if (
$db->select(
'in_transit',
'count(*) as count',
{
user_id => $uid,
train_no => $train->train_no,
checkout_station_id => $arr
}
)->hash->{count}
)
{
# check out (adds a cancelled journey and resets journey state
# to destination selection)
$self->app->checkout(
station => $arr,
force => 0,
uid => $uid
);
}
}
else {
$self->app->add_route_timestamps( $uid, $train, 0 );
}
}
elsif ( $entry->{real_arr_ts} ) {
my ( undef, $error ) = $self->app->checkout(
station => $arr,
force => 1,
uid => $uid
);
if ($error) {
die("${error}\n");
}
}
};
if ($@) {
$self->app->log->error("work($uid)/arrival: $@");
}
eval { }
}
for my $account_data ( $self->app->traewelling->get_pull_accounts ) {
# $account_data->{user_id} is the travelynx uid
# $account_data->{user_name} is the Träwelling username
$self->app->log->debug(
"Pulling Traewelling status for UID $account_data->{user_id}");
$self->app->traewelling_api->get_status_p(
username => $account_data->{data}{user_name},
token => $account_data->{token}
)->then(
sub {
my ($traewelling) = @_;
$self->app->traewelling_to_travelynx(
traewelling => $traewelling,
user_data => $account_data
);
}
)->catch(
sub {
my ($err) = @_;
$self->app->log->debug("Error $err");
}
)->wait;
}
# Computing yearly stats may take a while, but we've got all time in the
# world here. This means users won't have to wait when loading their
# own by-year journey log.
for my $user ( $db->select( 'users', 'id', { status => 1 } )->hashes->each )
{
$self->app->get_journey_stats(
uid => $user->{id},
year => $now->year
);
}
# TODO wait until all background jobs have terminated
}
1;
__END__
=head1 SYNOPSIS
Usage: index.pl work
Work Work Work.
Should be called from a cronjob every three minutes or so.