Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ gem 'hashdiff'
gem 'httpclient'
gem 'json-diff'
gem 'json-schema'
gem 'loggregator_emitter', '~> 5.0'
gem 'mime-types', '~> 3.7'
gem 'multipart-parser'
gem 'netaddr', '>= 2.0.4'
Expand Down
4 changes: 0 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ GEM
ms_rest_azure (~> 0.7.0)
backport (1.2.0)
base64 (0.3.0)
beefcake (1.0.0)
benchmark (0.5.0)
bigdecimal (4.1.1)
bit-struct (0.17)
Expand Down Expand Up @@ -284,8 +283,6 @@ GEM
rb-fsevent (~> 0.10, >= 0.10.3)
rb-inotify (~> 0.9, >= 0.9.10)
logger (1.7.0)
loggregator_emitter (5.2.0)
beefcake (~> 1.0.0)
loofah (2.25.1)
crass (~> 1.0.2)
nokogiri (>= 1.12.0)
Expand Down Expand Up @@ -636,7 +633,6 @@ DEPENDENCIES
json-diff
json-schema
listen
loggregator_emitter (~> 5.0)
machinist (~> 1.0.6)
mime-types (~> 3.7)
mock_redis
Expand Down
3 changes: 1 addition & 2 deletions config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ log_audit_events: false
telemetry_log_path: spec/artifacts/cloud_controller_telemetry.log

loggregator:
router: "127.0.0.1:3456"
internal_url: 'http://loggregator-trafficcontroller.service.cf.internal:8081'
endpoint: "127.0.0.1:3456"

logcache:
host: 'http://doppler.service.cf.internal'
Expand Down
6 changes: 4 additions & 2 deletions lib/cloud_controller/config_schemas/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ class ApiSchema < VCAP::Config
},

optional(:loggregator) => {
router: String,
internal_url: String
endpoint: String,
optional(:ca_file) => String,
optional(:cert_file) => String,
optional(:key_file) => String
},

optional(:fluent) => {
Expand Down
5 changes: 4 additions & 1 deletion lib/cloud_controller/config_schemas/clock_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class ClockSchema < VCAP::Config
optional(:uaa_client_scope) => String,

optional(:loggregator) => {
router: String
endpoint: String,
optional(:ca_file) => String,
optional(:cert_file) => String,
optional(:key_file) => String
},

optional(:fluent) => {
Expand Down
5 changes: 4 additions & 1 deletion lib/cloud_controller/config_schemas/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ class WorkerSchema < VCAP::Config
},

optional(:loggregator) => {
router: String
endpoint: String,
optional(:ca_file) => String,
optional(:cert_file) => String,
optional(:key_file) => String
},

optional(:fluent) => {
Expand Down
20 changes: 12 additions & 8 deletions lib/cloud_controller/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'cloud_controller/uaa/uaa_token_decoder'
require 'cloud_controller/uaa/uaa_verification_keys'
require 'app_log_emitter'
require 'loggregator_emitter'
require 'loggregator_emitter/client'
require 'fluent_emitter'
require 'cloud_controller/rack_app_builder'
require 'cloud_controller/metrics/periodic_updater'
Expand Down Expand Up @@ -174,16 +174,20 @@ def setup_blobstore
end

def setup_app_log_emitter
VCAP::AppLogEmitter.logger = logger
VCAP::AppLogEmitter.fluent_emitter = fluent_emitter if @config.get(:fluent)

if @config.get(:loggregator) && @config.get(
:loggregator, :router
)
VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(@config.get(:loggregator, :router), 'cloud_controller', 'API',
@config.get(:index))
end
return unless @config.get(:loggregator) && @config.get(:loggregator, :endpoint)

VCAP::AppLogEmitter.logger = logger
VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new(
endpoint: @config.get(:loggregator, :endpoint),
origin: 'cloud_controller',
source_type: 'API',
instance_id: @config.get(:index),
ca_cert_file: @config.get(:loggregator, :ca_file),
client_cert_file: @config.get(:loggregator, :cert_file),
client_key_file: @config.get(:loggregator, :key_file)
)
end

def fluent_emitter
Expand Down
20 changes: 13 additions & 7 deletions lib/delayed_job/delayed_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'puma'
require 'prometheus/middleware/exporter'
require 'cloud_controller/standalone_metrics_webserver'
require 'loggregator_emitter/client'

class CloudController::DelayedWorker
DEFAULT_READ_AHEAD_POSTGRES = 0
Expand Down Expand Up @@ -96,15 +97,20 @@ def get_initialized_delayed_worker(config, logger)
end

def setup_app_log_emitter(config, logger)
VCAP::AppLogEmitter.logger = logger
VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent)
if config.get(:loggregator) && config.get(
:loggregator, :router
)
VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API',
config.get(:index))
end

VCAP::AppLogEmitter.logger = logger
return unless config.get(:loggregator) && config.get(:loggregator, :endpoint)

VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new(
endpoint: config.get(:loggregator, :endpoint),
origin: 'cloud_controller',
source_type: 'API',
instance_id: config.get(:index),
ca_cert_file: config.get(:loggregator, :ca_file),
client_cert_file: config.get(:loggregator, :cert_file),
client_key_file: config.get(:loggregator, :key_file)
)
end

def fluent_emitter(config)
Expand Down
2 changes: 1 addition & 1 deletion lib/loggregator-api/v2/envelope_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions lib/loggregator-api/v2/ingress_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions lib/loggregator-api/v2/ingress_services_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions lib/loggregator_emitter/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require 'grpc'
require 'loggregator-api/v2/ingress_services_pb'

module LoggregatorEmitter
class Client
def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil)
raise ArgumentError.new('Must provide a valid endpoint') if endpoint.nil? || endpoint.empty?
raise ArgumentError.new('Must provide a valid origin') unless origin
raise ArgumentError.new('Must provide a valid source_type') unless source_type

@endpoint = endpoint
@credentials = build_credentials(ca_cert_file, client_cert_file, client_key_file)
@default_tags = { 'origin' => origin, 'source_type' => source_type }
@instance_id = instance_id && instance_id.to_s
end

def emit(app_id, message, tags={})
envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::OUT, tags)
stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope]))
end

def emit_error(app_id, message, tags={})
envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::ERR, tags)
stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope]))
end

private

def stub
@stub ||= Loggregator::V2::Ingress::Stub.new(@endpoint, @credentials)
end

def create_envelope(app_id, message, type, tags)
Loggregator::V2::Envelope.new(
source_id: app_id,
instance_id: @instance_id,
timestamp: Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond),
log: Loggregator::V2::Log.new(
payload: message.encode('UTF-8'),
type: type
),
tags: @default_tags.merge(tags.transform_keys(&:to_s).transform_values(&:to_s))
)
end

def build_credentials(ca_cert_file, client_cert_file, client_key_file)
return :this_channel_is_insecure unless ca_cert_file && client_cert_file && client_key_file

GRPC::Core::ChannelCredentials.new(
File.read(ca_cert_file),
File.read(client_key_file),
File.read(client_cert_file)
)
end
end
end
18 changes: 9 additions & 9 deletions spec/integration/app_log_emitter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
require 'spec_helper'
require 'securerandom'
require 'tempfile'

RSpec.describe 'Cloud controller Loggregator Integration', type: :integration do
before(:all) do
@loggregator_server = FakeLoggregatorServer.new(12_345)
@loggregator_server.start

@authed_headers = {
'Authorization' => "bearer #{admin_token}",
'Accept' => 'application/json',
Expand All @@ -18,13 +14,17 @@
config = VCAP::CloudController::YAMLConfig.safe_load_file(base_cc_config_file).deep_merge(
VCAP::CloudController::YAMLConfig.safe_load_file(port_8181_overrides)
)
config['loggregator'] = { 'endpoint' => 'localhost:12345' }

@cc_config_file = Tempfile.new('cc_config.yml')
@cc_config_file.write(YAML.dump(config))
@cc_config_file.close

start_cc(debug: false, config: @cc_config_file.path)

@loggregator_server = FakeLoggregatorServer.new(12_345)
@loggregator_server.start

org = org_with_default_quota(@authed_headers)
org_guid = org.json_body['metadata']['guid']

Expand All @@ -43,7 +43,7 @@
@loggregator_server.stop
end

it 'send logs to the loggregator' do
it 'sends logs to the loggregator' do
app = make_post_request('/v2/apps',
{
'name' => 'foo_app',
Expand All @@ -57,9 +57,9 @@
expect(messages.size).to eq(1)

message = messages.first
expect(message.message).to eq "Created app with guid #{app_id}"
expect(message.app_id).to eq app_id
expect(message.source_type).to eq 'API'
expect(message.message_type).to eq LogMessage::MessageType::OUT
expect(message.source_id).to eq(app_id)
expect(message.log.type).to eq(:OUT)
expect(message.log.payload).to eq("Created app with guid #{app_id}")
expect(message.tags['source_type']).to eq('API')
end
end
48 changes: 29 additions & 19 deletions spec/support/fake_loggregator_server.rb
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
require 'socket'
require 'sonde'
require 'grpc'
require 'loggregator-api/v2/ingress_services_pb'

class FakeLoggregatorServer
attr_reader :messages, :port, :sock
attr_reader :port

def initialize(port)
@messages = []
@port = port
@sock = UDPSocket.new
@envelopes = []
@mutex = Mutex.new
end

def start
@sock.bind('localhost', port)

@thread = Thread.new do
loop do
stuff = @sock.recv(65_536)
envelope = ::Sonde::Envelope.decode(stuff)
messages << envelope.logMessage
rescue Beefcake::Message::WrongTypeError, Beefcake::Message::RequiredFieldNotSetError, Beefcake::Message::InvalidValueError => e
puts 'ERROR'
puts e
end
end
service = FakeIngressService.new(@envelopes, @mutex)
@server = GRPC::RpcServer.new
@server.add_http2_port("localhost:#{@port}", :this_port_is_insecure)
@server.handle(service)
@thread = Thread.new { @server.run }
@server.wait_till_running
end

def stop
Thread.kill(@thread)
@sock.close
@server.stop
@thread.join
end

def messages
@mutex.synchronize { @envelopes.flat_map { |batch| batch.batch.to_a } }
end

class FakeIngressService < Loggregator::V2::Ingress::Service
def initialize(envelopes, mutex)
@envelopes = envelopes
@mutex = mutex
end

def send(envelope_batch, _call)
@mutex.synchronize { @envelopes << envelope_batch }
Loggregator::V2::SendResponse.new
end
end
end
Loading
Loading