Skip to content

Commit

Permalink
File streaming fixes.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 86c6d1394a31d6bb108a12e14f4d981133ece235
  • Loading branch information
levlam committed Dec 26, 2018
1 parent 3b238f6 commit ac3fa70
Show file tree
Hide file tree
Showing 28 changed files with 201 additions and 160 deletions.
2 changes: 1 addition & 1 deletion memprof/memprof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ void *memalign(std::size_t aligment, std::size_t size) {
}
}

// c++14 guarantees than it is enough to override this two operators.
// c++14 guarantees that it is enough to override these two operators.
void *operator new(std::size_t count) {
return malloc_with_frame(count, get_backtrace());
}
Expand Down
16 changes: 6 additions & 10 deletions td/generate/scheme/td_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ temporaryPasswordState has_password:Bool valid_for:int32 = TemporaryPasswordStat
//@can_be_deleted True, if the file can be deleted
//@is_downloading_active True, if the file is currently being downloaded (or a local copy is being generated by some other means)
//@is_downloading_completed True, if the local copy is fully available
//@download_offset Download will be started from this offset. downloaded_prefix_size is calculated from this offset.
//@downloaded_prefix_size If is_downloading_completed is false, then only some prefix of the file is ready to be read. downloaded_prefix_size is the size of that prefix
//@download_offset Download will be started from this offset. downloaded_prefix_size is calculated from this offset
//@downloaded_prefix_size If is_downloading_completed is false, then only some prefix of the file starting from download_offset is ready to be read. downloaded_prefix_size is the size of that prefix
//@downloaded_size Total downloaded file bytes. Should be used only for calculating download progress. The actual file size may be bigger, and some parts of it may contain garbage
localFile path:string can_be_downloaded:Bool can_be_deleted:Bool is_downloading_active:Bool is_downloading_completed:Bool download_offset:int32 downloaded_prefix_size:int32 downloaded_size:int32 = LocalFile;

Expand Down Expand Up @@ -2949,18 +2949,14 @@ setPinnedChats chat_ids:vector<int53> = Ok;
//@description Asynchronously downloads a file from the cloud. updateFile will be used to notify about the download progress and successful completion of the download. Returns file state just after the download has been started
//@file_id Identifier of the file to download
//@priority Priority of the download (1-32). The higher the priority, the earlier the file will be downloaded. If the priorities of two files are equal, then the last one for which downloadFile was called will be downloaded first
//@offset File will be downloaded starting from offset first. Supposed to be used for streaming.
//@offset File will be downloaded starting from that offset in bytes first. Supposed to be used for streaming
downloadFile file_id:int32 priority:int32 offset:int32 = File;

//@description Set offset for file downloading
//@file_id Identifier of file
//@offset File download offset
//@description Sets offset for file downloading @file_id Identifier of the file to change download offset @offset New file download offset
setFileDownloadOffset file_id:int32 offset:int32 = File;

//@description Get downloaded prefix from a given offset
//@file_id Identifier of file
//@offset Offset from which downloaded prefix is calculated
getFileDownloadedPrefix file_id:int32 offset:int32 = Count;
//@description Returns file downloaded prefix size from a given offset @file_id Identifier of the file @offset Offset from which downloaded prefix size should be calculated
getFileDownloadedPrefixSize file_id:int32 offset:int32 = Count;

//@description Stops the downloading of a file. If a file has already been downloaded, does nothing @file_id Identifier of a file to stop downloading @only_if_pending Pass true to stop downloading only if it hasn't been started, i.e. request hasn't been sent to server
cancelDownloadFile file_id:int32 only_if_pending:Bool = Ok;
Expand Down
Binary file modified td/generate/scheme/td_api.tlo
Binary file not shown.
3 changes: 3 additions & 0 deletions td/telegram/MessagesManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9956,6 +9956,9 @@ void MessagesManager::try_restore_dialog_reply_markup(Dialog *d, const Message *

void MessagesManager::set_dialog_pinned_message_notification(Dialog *d, MessageId message_id) {
auto old_message_id = d->pinned_message_notification_message_id;
if (!old_message_id.is_valid() && !message_id.is_valid()) {
return;
}
CHECK(old_message_id != message_id);
VLOG(notifications) << "Change pinned message notification in " << d->dialog_id << " from " << old_message_id
<< " to " << message_id;
Expand Down
2 changes: 1 addition & 1 deletion td/telegram/SecretChatActor.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SecretChatActor : public NetQueryCallback {

SecretChatActor(int32 id, unique_ptr<Context> context, bool can_be_empty);

// First query to new chat must be on of this two
// First query to new chat must be on of these two
void update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat);
void create_chat(int32 user_id, int64 user_access_hash, int32 random_id, Promise<SecretChatId> promise);
void cancel_chat(Promise<> promise);
Expand Down
2 changes: 1 addition & 1 deletion td/telegram/StickersManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3040,7 +3040,7 @@ Result<std::tuple<FileId, bool, bool>> StickersManager::prepare_input_file(
if (file_view.has_url()) {
is_url = true;
} else {
if (file_view.has_local_location() && file_view.local_size() > MAX_STICKER_FILE_SIZE) {
if (file_view.has_local_location() && file_view.expected_size() > MAX_STICKER_FILE_SIZE) {
return Status::Error(400, "File is too big");
}
is_local = true;
Expand Down
4 changes: 2 additions & 2 deletions td/telegram/Td.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4858,13 +4858,13 @@ void Td::on_request(uint64 id, const td_api::getFile &request) {
send_closure(actor_id(this), &Td::send_result, id, file_manager_->get_file_object(FileId(request.file_id_, 0)));
}

void Td::on_request(uint64 id, const td_api::getFileDownloadedPrefix &request) {
void Td::on_request(uint64 id, const td_api::getFileDownloadedPrefixSize &request) {
auto file_view = file_manager_->get_file_view(FileId(request.file_id_, 0));
if (file_view.empty()) {
return send_closure(actor_id(this), &Td::send_error, id, Status::Error(10, "Unknown file id"));
}
send_closure(actor_id(this), &Td::send_result, id,
td_api::make_object<td_api::count>(static_cast<int32>(file_view.downloaded_prefix(request.offset_))));
td_api::make_object<td_api::count>(narrow_cast<int32>(file_view.downloaded_prefix(request.offset_))));
}

void Td::on_request(uint64 id, td_api::getRemoteFile &request) {
Expand Down
2 changes: 1 addition & 1 deletion td/telegram/Td.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class Td final : public NetQueryCallback {

void on_request(uint64 id, const td_api::getFile &request);

void on_request(uint64 id, const td_api::getFileDownloadedPrefix &request);
void on_request(uint64 id, const td_api::getFileDownloadedPrefixSize &request);

void on_request(uint64 id, td_api::getRemoteFile &request);

Expand Down
30 changes: 14 additions & 16 deletions td/telegram/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2251,11 +2251,11 @@ class CliClient final : public Actor {
send_request(make_tl_object<td_api::getChatMessageByDate>(as_chat_id(chat_id), to_integer<int32>(date)));
} else if (op == "gf" || op == "GetFile") {
send_request(make_tl_object<td_api::getFile>(as_file_id(args)));
} else if (op == "gfp" || op == "GetFileDownloadedPrefix") {
} else if (op == "gfdps") {
string file_id;
string offset;
std::tie(file_id, offset) = split(args);
send_request(make_tl_object<td_api::getFileDownloadedPrefix>(as_file_id(file_id), to_integer<int32>(offset)));
send_request(make_tl_object<td_api::getFileDownloadedPrefixSize>(as_file_id(file_id), to_integer<int32>(offset)));
} else if (op == "grf") {
send_request(make_tl_object<td_api::getRemoteFile>(args, nullptr));
} else if (op == "gmtf") {
Expand All @@ -2277,36 +2277,34 @@ class CliClient final : public Actor {
as_location(latitude, longitude), to_integer<int32>(zoom), to_integer<int32>(width),
to_integer<int32>(height), to_integer<int32>(scale), as_chat_id(chat_id)));
} else if (op == "sfdo" || op == "SetDownloadFileOffset") {
string file_id_str;
string file_id;
string offset;
std::tie(file_id_str, offset) = split(args);
std::tie(file_id, offset) = split(args);

auto file_id = as_file_id(file_id_str);
send_request(make_tl_object<td_api::setFileDownloadOffset>(file_id, to_integer<int32>(offset)));
send_request(make_tl_object<td_api::setFileDownloadOffset>(as_file_id(file_id), to_integer<int32>(offset)));
} else if (op == "df" || op == "DownloadFile") {
string file_id_str;
string file_id;
string priority;
string offset;
std::tie(file_id_str, args) = split(args);
std::tie(priority, offset) = split(args);
std::tie(file_id, args) = split(args);
std::tie(offset, priority) = split(args);
if (priority.empty()) {
priority = "1";
}

auto file_id = as_file_id(file_id_str);
send_request(
make_tl_object<td_api::downloadFile>(file_id, to_integer<int32>(priority), to_integer<int32>(offset)));
send_request(make_tl_object<td_api::downloadFile>(as_file_id(file_id), to_integer<int32>(priority),
to_integer<int32>(offset)));
} else if (op == "dff") {
string file_id;
string max_file_id;
string priority;
string offset;
std::tie(file_id, args) = split(args);
std::tie(priority, offset) = split(args);
std::tie(max_file_id, args) = split(args);
std::tie(offset, priority) = split(args);
if (priority.empty()) {
priority = "1";
}

for (int i = 1; i <= as_file_id(file_id); i++) {
for (int i = 1; i <= as_file_id(max_file_id); i++) {
send_request(make_tl_object<td_api::downloadFile>(i, to_integer<int32>(priority), to_integer<int32>(offset)));
}
} else if (op == "cdf") {
Expand Down
67 changes: 42 additions & 25 deletions td/telegram/files/FileBitmask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,80 +5,97 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileBitmask.h"

#include "td/utils/common.h"
#include "td/utils/misc.h"

namespace td {

Bitmask::Bitmask(Decode, Slice data) : data_(zero_one_decode(data)) {
}
Bitmask::Bitmask(Ones, int64 count) : data_((count + 7) / 8, '\0') {

Bitmask::Bitmask(Ones, int64 count) : data_(narrow_cast<size_t>((count + 7) / 8), '\0') {
for (int64 i = 0; i < count; i++) {
set(i);
}
}

std::string Bitmask::encode() const {
// remove zeroes in the end to make encoding deteministic
// remove zeroes in the end to make encoding deterministic
td::Slice data(data_);
while (!data.empty() && data.back() == 0) {
while (!data.empty() && data.back() == '\0') {
data.remove_suffix(1);
}
return zero_one_encode(data_);
return zero_one_encode(data);
}
Bitmask::ReadySize Bitmask::get_ready_size(int64 offset, int64 part_size) const {
ReadySize res;
res.offset = offset;

int64 Bitmask::get_ready_prefix_size(int64 offset, int64 part_size, int64 file_size) const {
auto offset_part = offset / part_size;
auto ones = get_ready_parts(offset_part);
if (ones == 0) {
res.ready_size = 0;
} else {
res.ready_size = (offset_part + ones) * part_size - offset;
return 0;
}
CHECK(res.ready_size >= 0);
auto ready_parts_end = (offset_part + ones) * part_size;
if (file_size != 0 && ready_parts_end > file_size) {
ready_parts_end = file_size;
if (offset > file_size) {
offset = file_size;
}
}
auto res = ready_parts_end - offset;
CHECK(res >= 0);
return res;
}

int64 Bitmask::get_total_size(int64 part_size) const {
int64 res = 0;
for (int64 i = 0; i < size(); i++) {
res += get(i);
res += static_cast<int64>(get(i));
}
return res * part_size;
}
bool Bitmask::get(int64 offset) const {
if (offset < 0) {

bool Bitmask::get(int64 offset_part) const {
if (offset_part < 0) {
return 0;
}
if (offset / 8 >= narrow_cast<int64>(data_.size())) {
auto index = narrow_cast<size_t>(offset_part / 8);
if (index >= data_.size()) {
return 0;
}
return (data_[offset / 8] & (1 << (offset % 8))) != 0;
return (static_cast<uint8>(data_[index]) & (1 << static_cast<int>(offset_part % 8))) != 0;
}

int64 Bitmask::get_ready_parts(int64 offset) const {
int64 Bitmask::get_ready_parts(int64 offset_part) const {
int64 res = 0;
while (get(offset + res)) {
while (get(offset_part + res)) {
res++;
}
return res;
};
}

std::vector<int32> Bitmask::as_vector() const {
std::vector<int32> res;
for (int32 i = 0; i < narrow_cast<int32>(data_.size() * 8); i++) {
auto size = narrow_cast<int32>(data_.size() * 8);
for (int32 i = 0; i < size; i++) {
if (get(i)) {
res.push_back(i);
}
}
return res;
}
void Bitmask::set(int64 offset) {
auto need_size = narrow_cast<size_t>(offset / 8 + 1);

void Bitmask::set(int64 offset_part) {
CHECK(offset_part >= 0);
auto need_size = narrow_cast<size_t>(offset_part / 8 + 1);
if (need_size > data_.size()) {
data_.resize(need_size, 0);
data_.resize(need_size, '\0');
}
data_[need_size - 1] |= (1 << (offset % 8));
data_[need_size - 1] |= (1 << (offset_part % 8));
}

int64 Bitmask::size() const {
return data_.size() * 8;
return static_cast<int64>(data_.size() * 8);
}

} // namespace td
22 changes: 8 additions & 14 deletions td/telegram/files/FileBitmask.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,40 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once

#include "td/utils/common.h"
#include "td/utils/Slice.h"
#include "td/utils/StringBuilder.h"

namespace td {

class Bitmask {
public:
struct ReadySize {
int64 offset{-1};
int64 ready_size{-1};
bool empty() const {
return offset == -1;
}
};
struct Decode {};
struct Ones {};
Bitmask() = default;
Bitmask(Decode, Slice data);
Bitmask(Ones, int64 count);
std::string encode() const;
ReadySize get_ready_size(int64 offset, int64 part_size) const;
int64 get_ready_prefix_size(int64 offset, int64 part_size, int64 file_size) const;
int64 get_total_size(int64 part_size) const;
bool get(int64 offset) const;
bool get(int64 offset_part) const;

int64 get_ready_parts(int64 offset) const;
int64 get_ready_parts(int64 offset_part) const;

std::vector<int32> as_vector() const;
void set(int64 offset);
void set(int64 offset_part);
int64 size() const;

private:
std::string data_;
};

inline StringBuilder &operator<<(StringBuilder &sb, const Bitmask &mask) {
std::string res;
for (int64 i = 0; i < mask.size(); i++) {
res += mask.get(i) ? "1" : "0";
sb << (mask.get(i) ? '1' : '0');
}
return sb << res;
return sb;
}

} // namespace td
17 changes: 9 additions & 8 deletions td/telegram/files/FileDownloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
res.ready_parts = bitmask.as_vector();
res.use_part_count_limit = false;
res.only_check = only_check_;
res.need_delay = !is_small_ && (remote_.file_type_ == FileType::VideoNote ||
remote_.file_type_ == FileType::VoiceNote || remote_.file_type_ == FileType::Audio ||
remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation ||
(remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20)));
res.need_delay =
!is_small_ && (remote_.file_type_ == FileType::VideoNote || remote_.file_type_ == FileType::Document ||
remote_.file_type_ == FileType::VoiceNote || remote_.file_type_ == FileType::Audio ||
remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation ||
(remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20)));
res.offset = offset_;
return res;
}
Expand Down Expand Up @@ -304,7 +305,6 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
if (encryption_key_.is_secret()) {
padded_size = (part.size + 15) & ~15;
}
LOG(INFO) << "Got " << bytes.size() << " bytes, padded_size = " << padded_size << " for " << path_;
if (bytes.size() > padded_size) {
return Status::Error("Part size is more than requested");
}
Expand Down Expand Up @@ -338,14 +338,15 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {

auto slice = bytes.as_slice().truncate(part.size);
TRY_STATUS(acquire_fd());
LOG(INFO) << "Got " << slice.size() << " bytes at " << part.offset << " for \"" << path_ << '"';
TRY_RESULT(written, fd_.pwrite(slice, part.offset));
// may write less than part.size, when size of downloadable file is unknown
if (written != slice.size()) {
return Status::Error("Failed to save file part to the file");
}
return written;
}
void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask,
void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask,
bool is_ready, int64 ready_size) {
if (is_ready) {
// do not send partial location. will lead to wrong local_size
Expand All @@ -355,7 +356,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_
return;
}
if (encryption_key_.empty() || encryption_key_.is_secure()) {
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, path_, part_size, "", ready_bitmask},
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", ready_bitmask},
ready_size);
} else if (encryption_key_.is_secret()) {
UInt256 iv;
Expand All @@ -365,7 +366,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_
LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_);
}
callback_->on_partial_download(
PartialLocalFileLocation{remote_.file_type_, path_, part_size, as_slice(iv).str(), ready_bitmask}, ready_size);
PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(), ready_bitmask}, ready_size);
} else {
UNREACHABLE();
}
Expand Down
Loading

0 comments on commit ac3fa70

Please sign in to comment.