2018-02-16 17:25:45 -05:00
|
|
|
#include <cassert>
|
|
|
|
#include <cstring>
|
|
|
|
#include "S3ObjectStream.h"
|
2020-09-13 19:41:04 -04:00
|
|
|
#include "amazon/AmazonS3Client.h"
|
2018-02-16 17:25:45 -05:00
|
|
|
#include "Singleton.h"
|
|
|
|
#include "AppConfig.h"
|
2018-02-19 16:46:16 -05:00
|
|
|
#include "PathUtils.h"
|
|
|
|
#include "string_format.h"
|
|
|
|
#include "StdStreamUtils.h"
|
2018-02-27 14:18:17 -05:00
|
|
|
#include "Log.h"
|
2018-02-16 17:25:45 -05:00
|
|
|
|
|
|
|
#define PREF_S3_OBJECTSTREAM_ACCESSKEYID "s3.objectstream.accesskeyid"
|
|
|
|
#define PREF_S3_OBJECTSTREAM_SECRETACCESSKEY "s3.objectstream.secretaccesskey"
|
2018-02-28 09:18:00 -05:00
|
|
|
#define CACHE_PATH "Play Data Files/s3objectstream_cache"
|
2018-02-16 17:25:45 -05:00
|
|
|
|
2018-02-27 14:18:17 -05:00
|
|
|
#define LOG_NAME "s3objectstream"
|
|
|
|
|
2019-05-13 12:36:36 -04:00
|
|
|
#define BUFFERSIZE 0x40000
|
|
|
|
|
2018-10-09 13:06:08 -04:00
|
|
|
CS3ObjectStream::CConfig::CConfig()
|
2018-02-16 17:25:45 -05:00
|
|
|
{
|
2018-10-09 13:06:08 -04:00
|
|
|
CAppConfig::GetInstance().RegisterPreferenceString(PREF_S3_OBJECTSTREAM_ACCESSKEYID, "");
|
|
|
|
CAppConfig::GetInstance().RegisterPreferenceString(PREF_S3_OBJECTSTREAM_SECRETACCESSKEY, "");
|
|
|
|
}
|
2018-02-16 17:25:45 -05:00
|
|
|
|
2021-04-22 19:06:45 -04:00
|
|
|
CAmazonCredentials CS3ObjectStream::CConfig::GetCredentials()
|
2018-10-09 13:06:08 -04:00
|
|
|
{
|
2021-04-22 19:06:45 -04:00
|
|
|
CAmazonCredentials credentials;
|
|
|
|
credentials.accessKeyId = CAppConfig::GetInstance().GetPreferenceString(PREF_S3_OBJECTSTREAM_ACCESSKEYID);
|
|
|
|
credentials.secretAccessKey = CAppConfig::GetInstance().GetPreferenceString(PREF_S3_OBJECTSTREAM_SECRETACCESSKEY);
|
|
|
|
return credentials;
|
2018-10-09 13:06:08 -04:00
|
|
|
}
|
2018-02-16 17:25:45 -05:00
|
|
|
|
2021-04-22 19:06:45 -04:00
|
|
|
CS3ObjectStream::CS3ObjectStream(const char* bucketName, const char* objectKey)
|
2018-02-16 17:25:45 -05:00
|
|
|
: m_bucketName(bucketName)
|
2021-04-22 19:06:45 -04:00
|
|
|
, m_objectKey(objectKey)
|
2018-02-16 17:25:45 -05:00
|
|
|
{
|
2019-05-13 12:36:36 -04:00
|
|
|
m_buffer.resize(BUFFERSIZE);
|
2018-02-19 16:46:16 -05:00
|
|
|
Framework::PathUtils::EnsurePathExists(GetCachePath());
|
2018-02-16 17:25:45 -05:00
|
|
|
GetObjectInfo();
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64 CS3ObjectStream::Read(void* buffer, uint64 size)
|
|
|
|
{
|
2019-05-13 12:36:36 -04:00
|
|
|
assert(m_objectPosition <= m_objectSize);
|
2018-02-19 16:46:16 -05:00
|
|
|
|
2019-05-13 12:36:36 -04:00
|
|
|
uint64 adjSize = std::min(size, m_objectSize - m_objectPosition);
|
|
|
|
auto outBuffer = reinterpret_cast<uint8*>(buffer);
|
2018-03-13 17:57:39 -04:00
|
|
|
|
2019-05-13 12:36:36 -04:00
|
|
|
while(adjSize != 0)
|
2018-02-27 14:18:17 -05:00
|
|
|
{
|
2019-05-13 12:36:36 -04:00
|
|
|
//Read if we're inside buffer size
|
|
|
|
if((m_bufferPosition / BUFFERSIZE) == (m_objectPosition / BUFFERSIZE))
|
2018-02-27 14:18:17 -05:00
|
|
|
{
|
2019-05-13 12:36:36 -04:00
|
|
|
uint64 bufferOffset = m_objectPosition % BUFFERSIZE;
|
|
|
|
uint64 remainSize = BUFFERSIZE - bufferOffset;
|
|
|
|
assert(remainSize <= BUFFERSIZE);
|
|
|
|
auto copySize = std::min(remainSize, adjSize);
|
|
|
|
assert(copySize <= adjSize);
|
|
|
|
memcpy(outBuffer, m_buffer.data() + bufferOffset, copySize);
|
|
|
|
m_objectPosition += copySize;
|
|
|
|
outBuffer += copySize;
|
|
|
|
adjSize -= copySize;
|
2018-03-13 17:57:39 -04:00
|
|
|
}
|
2019-05-13 12:36:36 -04:00
|
|
|
else
|
2018-03-13 17:57:39 -04:00
|
|
|
{
|
2019-05-13 12:36:36 -04:00
|
|
|
SyncBuffer();
|
2018-02-27 14:18:17 -05:00
|
|
|
}
|
2018-02-19 16:46:16 -05:00
|
|
|
}
|
|
|
|
|
2019-05-13 12:36:36 -04:00
|
|
|
assert(m_objectPosition <= m_objectSize);
|
2018-02-19 16:46:16 -05:00
|
|
|
return size;
|
2018-02-16 17:25:45 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
uint64 CS3ObjectStream::Write(const void*, uint64)
|
|
|
|
{
|
|
|
|
throw std::runtime_error("Not supported.");
|
|
|
|
}
|
|
|
|
|
|
|
|
void CS3ObjectStream::Seek(int64 offset, Framework::STREAM_SEEK_DIRECTION direction)
|
|
|
|
{
|
|
|
|
switch(direction)
|
|
|
|
{
|
|
|
|
case Framework::STREAM_SEEK_SET:
|
|
|
|
assert(offset <= m_objectSize);
|
|
|
|
m_objectPosition = offset;
|
|
|
|
break;
|
|
|
|
case Framework::STREAM_SEEK_CUR:
|
|
|
|
m_objectPosition += offset;
|
|
|
|
break;
|
|
|
|
case Framework::STREAM_SEEK_END:
|
|
|
|
m_objectPosition = m_objectSize;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64 CS3ObjectStream::Tell()
|
|
|
|
{
|
|
|
|
return m_objectPosition;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CS3ObjectStream::IsEOF()
|
|
|
|
{
|
|
|
|
return (m_objectPosition == m_objectSize);
|
|
|
|
}
|
|
|
|
|
2019-10-16 20:51:11 -04:00
|
|
|
fs::path CS3ObjectStream::GetCachePath()
|
2018-02-19 16:46:16 -05:00
|
|
|
{
|
2018-02-28 09:18:00 -05:00
|
|
|
return Framework::PathUtils::GetCachePath() / CACHE_PATH;
|
2018-02-19 16:46:16 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string CS3ObjectStream::GenerateReadCacheKey(const std::pair<uint64, uint64>& range) const
|
|
|
|
{
|
2018-03-07 09:07:43 -05:00
|
|
|
return string_format("%s-%llu-%llu", m_objectEtag.c_str(), range.first, range.second);
|
2018-02-19 16:46:16 -05:00
|
|
|
}
|
|
|
|
|
2018-02-26 09:08:57 -05:00
|
|
|
static std::string TrimQuotes(std::string input)
|
|
|
|
{
|
|
|
|
if(input.empty()) return input;
|
|
|
|
if(input[0] == '"')
|
|
|
|
{
|
|
|
|
input = std::string(input.begin() + 1, input.end());
|
|
|
|
}
|
|
|
|
if(input.empty()) return input;
|
|
|
|
if(input[input.size() - 1] == '"')
|
|
|
|
{
|
|
|
|
input = std::string(input.begin(), input.end() - 1);
|
|
|
|
}
|
|
|
|
return input;
|
|
|
|
}
|
|
|
|
|
2018-02-16 17:25:45 -05:00
|
|
|
void CS3ObjectStream::GetObjectInfo()
|
|
|
|
{
|
2018-02-27 13:08:45 -05:00
|
|
|
//Obtain bucket region
|
2018-02-16 17:25:45 -05:00
|
|
|
{
|
2021-04-22 19:06:45 -04:00
|
|
|
CAmazonS3Client client(CConfig::GetInstance().GetCredentials());
|
2018-02-16 17:25:45 -05:00
|
|
|
|
|
|
|
GetBucketLocationRequest request;
|
2018-02-27 13:08:45 -05:00
|
|
|
request.bucket = m_bucketName;
|
2018-02-16 17:25:45 -05:00
|
|
|
|
2018-02-27 13:08:45 -05:00
|
|
|
auto result = client.GetBucketLocation(request);
|
|
|
|
m_bucketRegion = result.locationConstraint;
|
2018-02-16 17:25:45 -05:00
|
|
|
}
|
|
|
|
|
2018-02-27 13:08:45 -05:00
|
|
|
//Obtain object info
|
|
|
|
{
|
2021-04-22 19:06:45 -04:00
|
|
|
CAmazonS3Client client(CConfig::GetInstance().GetCredentials(), m_bucketRegion);
|
2018-02-27 13:08:45 -05:00
|
|
|
|
|
|
|
HeadObjectRequest request;
|
|
|
|
request.bucket = m_bucketName;
|
2021-04-22 19:06:45 -04:00
|
|
|
request.key = m_objectKey;
|
2018-02-27 13:08:45 -05:00
|
|
|
|
|
|
|
auto objectHeader = client.HeadObject(request);
|
|
|
|
m_objectSize = objectHeader.contentLength;
|
|
|
|
m_objectEtag = TrimQuotes(objectHeader.etag);
|
|
|
|
}
|
2018-02-16 17:25:45 -05:00
|
|
|
}
|
2019-05-13 12:36:36 -04:00
|
|
|
|
|
|
|
void CS3ObjectStream::SyncBuffer()
|
|
|
|
{
|
|
|
|
m_bufferPosition = (m_objectPosition / BUFFERSIZE) * BUFFERSIZE;
|
|
|
|
|
|
|
|
uint64 size = std::min<uint64>(BUFFERSIZE, m_objectSize - m_bufferPosition);
|
|
|
|
auto range = std::make_pair(m_bufferPosition, m_bufferPosition + size - 1);
|
|
|
|
auto readCacheFilePath = GetCachePath() / GenerateReadCacheKey(range);
|
|
|
|
|
|
|
|
#ifdef _TRACEGET
|
|
|
|
static FILE* output = fopen("getobject.log", "wb");
|
|
|
|
fprintf(output, "%ld,%ld,%ld\r\n", range.first, range.second, size);
|
|
|
|
fflush(output);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
bool cachedReadSucceeded =
|
|
|
|
[&]() {
|
|
|
|
try
|
|
|
|
{
|
2019-10-16 20:51:11 -04:00
|
|
|
if(fs::exists(readCacheFilePath))
|
2019-05-13 12:36:36 -04:00
|
|
|
{
|
|
|
|
auto readCacheFileStream = Framework::CreateInputStdStream(readCacheFilePath.native());
|
2023-05-02 08:56:25 -04:00
|
|
|
FRAMEWORK_MAYBE_UNUSED auto cacheRead = readCacheFileStream.Read(m_buffer.data(), size);
|
2019-05-13 12:36:36 -04:00
|
|
|
assert(cacheRead == size);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch(const std::exception& exception)
|
|
|
|
{
|
|
|
|
//Not a problem if we failed to read cache
|
|
|
|
CLog::GetInstance().Print(LOG_NAME, "Failed to read cache: '%s'.\r\n", exception.what());
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}();
|
|
|
|
|
|
|
|
if(!cachedReadSucceeded)
|
|
|
|
{
|
|
|
|
assert(size > 0);
|
2021-04-22 19:06:45 -04:00
|
|
|
CAmazonS3Client client(CConfig::GetInstance().GetCredentials(), m_bucketRegion);
|
2019-05-13 12:36:36 -04:00
|
|
|
GetObjectRequest request;
|
2021-04-22 19:06:45 -04:00
|
|
|
request.key = m_objectKey;
|
2019-05-13 12:36:36 -04:00
|
|
|
request.bucket = m_bucketName;
|
|
|
|
request.range = range;
|
|
|
|
auto objectContent = client.GetObject(request);
|
|
|
|
assert(objectContent.data.size() == size);
|
|
|
|
memcpy(m_buffer.data(), objectContent.data.data(), size);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
auto readCacheFileStream = Framework::CreateOutputStdStream(readCacheFilePath.native());
|
|
|
|
readCacheFileStream.Write(objectContent.data.data(), size);
|
|
|
|
}
|
|
|
|
catch(const std::exception& exception)
|
|
|
|
{
|
|
|
|
//Not a problem if we failed to write cache
|
|
|
|
CLog::GetInstance().Print(LOG_NAME, "Failed to write cache: '%s'.\r\n", exception.what());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|