Commit defccf0e authored by Jari Sundell's avatar Jari Sundell Committed by GitHub

Merge pull request #117 from rakshasa/fix-dht

Fix dht
parents 382fe39e 6cd3fd38
......@@ -82,6 +82,8 @@ public:
bool is_bindable() const;
bool is_address_any() const;
bool is_valid_inet_class() const { return family() == af_inet || family() == af_inet6; }
// Should we need to set AF_UNSPEC?
void clear() { std::memset(this, 0, sizeof(socket_address)); set_family(); }
......@@ -339,11 +341,10 @@ socket_address::pretty_address_str() const {
return sa_inet()->address_str();
case af_inet6:
return sa_inet6()->address_str();
case af_unspec:
return std::string("unspec");
default:
if (port() == 0)
return std::string("no family");
else
return std::string("no family with port");
return std::string("invalid");
}
}
......
......@@ -39,11 +39,15 @@
#include "torrent/exceptions.h"
#include "torrent/object.h"
#include "torrent/utils/log.h"
#include "net/address_list.h" // For SA.
#include "dht_node.h"
#define LT_LOG_THIS(log_fmt, ...) \
lt_log_print_hash(torrent::LOG_DHT_NODE, this->id(), "dht_node", log_fmt, __VA_ARGS__);
namespace torrent {
DhtNode::DhtNode(const HashString& id, const rak::socket_address* sa) :
......@@ -54,9 +58,13 @@ DhtNode::DhtNode(const HashString& id, const rak::socket_address* sa) :
m_recentlyInactive(0),
m_bucket(NULL) {
if (sa->family() != rak::socket_address::af_inet &&
(sa->family() != rak::socket_address::af_inet6 || !sa->sa_inet6()->is_any()))
throw resource_error("Addres not af_inet or in6addr_any");
// TODO: Change this to use the id hash similar to how peer info
// hash'es are logged.
LT_LOG_THIS("created (address:%s)", sa->pretty_address_str().c_str());
// if (sa->family() != rak::socket_address::af_inet &&
// (sa->family() != rak::socket_address::af_inet6 || !sa->sa_inet6()->is_any()))
// throw resource_error("Address not af_inet or in6addr_any");
}
DhtNode::DhtNode(const std::string& id, const Object& cache) :
......@@ -65,11 +73,17 @@ DhtNode::DhtNode(const std::string& id, const Object& cache) :
m_recentlyInactive(0),
m_bucket(NULL) {
// TODO: Check how DHT handles inet6.
rak::socket_address_inet* sa = m_socketAddress.sa_inet();
sa->set_family();
sa->set_address_h(cache.get_key_value("i"));
sa->set_port(cache.get_key_value("p"));
m_lastSeen = cache.get_key_value("t");
LT_LOG_THIS("initializing (address:%s)", sa->address_str().c_str());
update();
}
......
......@@ -43,6 +43,8 @@
#include "torrent/dht_manager.h"
#include "torrent/download_info.h"
#include "torrent/exceptions.h"
#include "torrent/utils/log.h"
#include "utils/sha1.h"
#include "manager.h"
......@@ -51,6 +53,9 @@
#include "dht_tracker.h"
#include "dht_transaction.h"
#define LT_LOG_THIS(log_fmt, ...) \
lt_log_print_hash(torrent::LOG_DHT_ROUTER, this->id(), "dht_router", log_fmt, __VA_ARGS__);
namespace torrent {
HashString DhtRouter::zero_id;
......@@ -88,12 +93,16 @@ DhtRouter::DhtRouter(const Object& cache, const rak::socket_address* sa) :
sha.final_c(data());
}
LT_LOG_THIS("creating (address:%s)", sa->pretty_address_str().c_str());
set_bucket(new DhtBucket(zero_id, ones_id));
m_routingTable.insert(std::make_pair(bucket()->id_range_end(), bucket()));
if (cache.has_key("nodes")) {
const Object::map_type& nodes = cache.get_key_map("nodes");
LT_LOG_THIS("adding nodes (size:%zu)", nodes.size());
for (Object::map_type::const_iterator itr = nodes.begin(); itr != nodes.end(); ++itr) {
if (itr->first.length() != HashString::size_data)
throw bencode_error("Loading cache: Invalid node hash.");
......@@ -134,6 +143,8 @@ DhtRouter::~DhtRouter() {
void
DhtRouter::start(int port) {
LT_LOG_THIS("starting (port:%d)", port);
m_server.start(port);
// Set timeout slot and schedule it to be called immediately for initial bootstrapping if necessary.
......
......@@ -49,6 +49,7 @@
#include "torrent/poll.h"
#include "torrent/object_static_map.h"
#include "torrent/throttle.h"
#include "torrent/utils/log.h"
#include "tracker/tracker_dht.h"
#include "dht_bucket.h"
......@@ -57,6 +58,9 @@
#include "manager.h"
#define LT_LOG_THIS(log_fmt, ...) \
lt_log_print_subsystem(torrent::LOG_DHT_SERVER, "dht_server", log_fmt, __VA_ARGS__);
namespace torrent {
const char* DhtServer::queries[] = {
......@@ -144,8 +148,15 @@ DhtServer::start(int port) {
throw resource_error("Could not set listening port to reuse address.");
rak::socket_address sa = *m_router->address();
if (sa.family() == rak::socket_address::af_unspec)
sa.sa_inet6()->clear();
sa.set_port(port);
LT_LOG_THIS("starting (address:%s)", sa.pretty_address_str().c_str());
// Figure out how to bind to both inet and inet6.
if (!get_fd().bind(sa))
throw resource_error("Could not bind datagram socket.");
......@@ -173,6 +184,8 @@ DhtServer::stop() {
if (!is_active())
return;
LT_LOG_THIS("stopping", 0);
clear_transactions();
priority_queue_erase(&taskScheduler, &m_taskTimeout);
......
......@@ -84,6 +84,17 @@ SocketFd::set_reuse_address(bool state) {
return setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0;
}
bool
SocketFd::set_ipv6_v6only(bool state) {
check_valid();
if (!m_ipv6_socket)
return false;
int opt = state;
return setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, &opt, sizeof(opt)) == 0;
}
bool
SocketFd::set_send_buffer_size(uint32_t s) {
check_valid();
......@@ -124,8 +135,12 @@ SocketFd::open_stream() {
m_ipv6_socket = true;
int zero = 0;
return setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero)) != -1;
if (!set_ipv6_v6only(false)) {
close();
return false;
}
return true;
}
bool
......@@ -139,8 +154,12 @@ SocketFd::open_datagram() {
m_ipv6_socket = true;
int zero = 0;
return setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero)) != -1;
if (!set_ipv6_v6only(false)) {
close();
return false;
}
return true;
}
bool
......
......@@ -59,6 +59,7 @@ public:
bool set_nonblock();
bool set_reuse_address(bool state);
bool set_ipv6_v6only(bool state);
bool set_priority(priority_type p);
......
......@@ -38,12 +38,16 @@
#include <torrent/exceptions.h>
#include <torrent/throttle.h>
#include <torrent/utils/log.h>
#include "manager.h"
#include "dht/dht_router.h"
#include "dht_manager.h"
#define LT_LOG_THIS(log_fmt, ...) \
lt_log_print_subsystem(torrent::LOG_DHT_MANAGER, "dht_manager", log_fmt, __VA_ARGS__);
namespace torrent {
DhtManager::~DhtManager() {
......@@ -53,26 +57,46 @@ DhtManager::~DhtManager() {
void
DhtManager::initialize(const Object& dhtCache) {
auto bind_address = rak::socket_address::cast_from(manager->connection_manager()->bind_address());
LT_LOG_THIS("initializing (bind_address:%s)", bind_address->pretty_address_str().c_str());
if (m_router != NULL)
throw internal_error("DhtManager::initialize called with DHT already active.");
m_router = new DhtRouter(dhtCache, rak::socket_address::cast_from(manager->connection_manager()->bind_address()));
try {
m_router = new DhtRouter(dhtCache, bind_address);
} catch (torrent::local_error& e) {
LT_LOG_THIS("initialization failed (error:%s)", e.what());
}
}
void
bool
DhtManager::start(port_type port) {
LT_LOG_THIS("starting (port:%d)", port);
if (m_router == NULL)
throw internal_error("DhtManager::start called without initializing first.");
m_port = port;
m_router->start(port);
}
try {
m_router->start(port);
} catch (torrent::local_error& e) {
LT_LOG_THIS("start failed (error:%s)", e.what());
return false;
}
return true;
}
void
DhtManager::stop() {
if (m_router != NULL)
m_router->stop();
if (m_router == NULL)
return;
LT_LOG_THIS("stopping", 0);
m_router->stop();
}
bool
......
......@@ -82,7 +82,7 @@ public:
void initialize(const Object& dhtCache);
void start(port_type port);
bool start(port_type port);
void stop();
// Store DHT cache in the given container and return the container.
......
......@@ -173,9 +173,13 @@ log_group::internal_print(const HashString* hash, const char* subsystem, const v
char buffer[buffer_size];
char* first = buffer;
if (hash != NULL && subsystem != NULL) {
first = hash_string_to_hex(*hash, first);
first += snprintf(first, 4096 - (first - buffer), "->%s: ", subsystem);
if (subsystem != NULL) {
if (hash != NULL) {
first = hash_string_to_hex(*hash, first);
first += snprintf(first, 4096 - (first - buffer), "->%s: ", subsystem);
} else {
first += snprintf(first, 4096 - (first - buffer), "%s: ", subsystem);
}
}
va_start(ap, fmt);
......@@ -205,6 +209,7 @@ log_group::internal_print(const HashString* hash, const char* subsystem, const v
}
#define LOG_CASCADE(parent) LOG_CHILDREN_CASCADE(parent, parent)
#define LOG_LINK(parent, child) log_children.push_back(std::make_pair(parent, child))
#define LOG_CHILDREN_CASCADE(parent, subgroup) \
log_children.push_back(std::make_pair(parent + LOG_ERROR, subgroup + LOG_CRITICAL)); \
......@@ -243,6 +248,11 @@ log_initialize() {
LOG_CHILDREN_CASCADE(LOG_CRITICAL, LOG_TRACKER_CRITICAL);
LOG_CHILDREN_CASCADE(LOG_CRITICAL, LOG_TORRENT_CRITICAL);
LOG_LINK(LOG_DHT_ALL, LOG_DHT_MANAGER);
LOG_LINK(LOG_DHT_ALL, LOG_DHT_NODE);
LOG_LINK(LOG_DHT_ALL, LOG_DHT_ROUTER);
LOG_LINK(LOG_DHT_ALL, LOG_DHT_SERVER);
std::sort(log_children.begin(), log_children.end());
log_rebuild_cache();
......@@ -401,9 +411,9 @@ log_open_file_output(const char* name, const char* filename) {
throw input_error("Could not open log file '" + std::string(filename) + "'.");
log_open_output(name, std::bind(&log_file_write, outfile,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
}
void
......@@ -417,9 +427,9 @@ log_open_gz_file_output(const char* name, const char* filename) {
// throw input_error("Could not set gzip log file buffer size.");
log_open_output(name, std::bind(&log_gz_file_write, outfile,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
}
log_buffer*
......@@ -428,9 +438,9 @@ log_open_log_buffer(const char* name) {
try {
log_open_output(name, std::bind(&log_buffer::lock_and_push_log, buffer,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
return buffer;
} catch (torrent::input_error& e) {
......
......@@ -113,10 +113,15 @@ enum {
LOG_NON_CASCADING,
LOG_DHT_ALL,
LOG_DHT_MANAGER,
LOG_DHT_NODE,
LOG_DHT_ROUTER,
LOG_DHT_SERVER,
LOG_INSTRUMENTATION_MEMORY,
LOG_INSTRUMENTATION_MINCORE,
LOG_INSTRUMENTATION_CHOKE,
LOG_INSTRUMENTATION_POLLING,
LOG_INSTRUMENTATION_TRANSFERS,
......@@ -155,10 +160,18 @@ enum {
if (torrent::log_groups[log_group].valid()) \
torrent::log_groups[log_group].internal_print(NULL, NULL, log_dump_data, log_dump_size, __VA_ARGS__); \
#define lt_log_print_hash(log_group, log_hash, log_subsystem, ...) \
if (torrent::log_groups[log_group].valid()) \
torrent::log_groups[log_group].internal_print(&log_hash, log_subsystem, NULL, 0, __VA_ARGS__);
#define lt_log_print_info_dump(log_group, log_dump_data, log_dump_size, log_info, log_subsystem, ...) \
if (torrent::log_groups[log_group].valid()) \
torrent::log_groups[log_group].internal_print(&log_info->hash(), log_subsystem, log_dump_data, log_dump_size, __VA_ARGS__); \
#define lt_log_print_subsystem(log_group, log_subsystem, ...) \
if (torrent::log_groups[log_group].valid()) \
torrent::log_groups[log_group].internal_print(NULL, log_subsystem, NULL, 0, __VA_ARGS__);
class log_buffer;
typedef std::function<void (const char*, unsigned int, int)> log_slot;
......
......@@ -190,6 +190,12 @@ const char* option_list_log_group[] = {
"__non_cascading__",
"dht_all",
"dht_manager",
"dht_node",
"dht_router",
"dht_server",
"instrumentation_memory",
"instrumentation_mincore",
"instrumentation_choke",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment